2026-03-22 03:30:21 +00:00
|
|
|
use crate::monitoring::MetricsStore;
|
|
|
|
|
use crate::state::StateManager;
|
|
|
|
|
use anyhow::Result;
|
|
|
|
|
use futures_util::{SinkExt, StreamExt};
|
|
|
|
|
use hyper::{Request, Response};
|
|
|
|
|
use hyper_ws_listener::WsStream;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::time::Instant;
|
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
|
use tokio_tungstenite::tungstenite::Message;
|
|
|
|
|
use tracing::{debug, info};
|
|
|
|
|
|
|
|
|
|
use super::ApiHandler;
|
|
|
|
|
|
|
|
|
|
impl ApiHandler {
|
|
|
|
|
pub(super) async fn handle_websocket(
|
|
|
|
|
req: Request<hyper::Body>,
|
|
|
|
|
state_manager: Arc<StateManager>,
|
|
|
|
|
metrics_store: Arc<MetricsStore>,
|
|
|
|
|
) -> Result<Response<hyper::Body>> {
|
|
|
|
|
let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req)
|
|
|
|
|
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
|
|
|
|
|
|
|
|
|
|
if let Some(ws_fut) = ws_fut_opt {
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
let ws_stream: WsStream = match ws_fut.await {
|
|
|
|
|
Ok(Ok(s)) => s,
|
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
|
debug!("WebSocket handshake failed (hyper): {}", e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
debug!("WebSocket task join failed: {}", e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
metrics_store.increment_ws();
|
|
|
|
|
info!("WebSocket /ws/db connected");
|
|
|
|
|
|
|
|
|
|
let (mut tx, mut rx) = ws_stream.split();
|
|
|
|
|
|
fix: fresh-ISO feedback bug-bash — onboarding, status truthfulness, recovery, kiosk, logs
Fixes from real fresh-install feedback (Framework node .81) + its log bundle:
Backend:
- websocket: subscribe before initial snapshot — broadcasts in the gap were
silently lost, stranding clients on stale state until a hard refresh
(the "everything needs ctrl-r" bug: My Apps stuck Loading, App Store
stuck Checking, containers-scanned never arriving)
- crash recovery: check the crash marker BEFORE writing our own PID —
recovery had never run on any node (always saw its own PID and skipped);
PID-reuse guard via /proc cmdline
- boot status: pending-boot-starts registry (recovery, stack recovery,
reconciler, adoption) — scanner overlays queued-but-down apps as
Restarting instead of Stopped after a reboot; scanner-authored
Restarting resolves immediately on a settled scan (no transitional wedge)
- install deps: bounded wait (36x5s) when a dependency is installed but
still starting ("Waiting for Bitcoin to start…") instead of instant
rejection; dependency-gate rejections remove the optimistic entry (no
phantom Stopped tile) and surface as a notification
- seed backup: auth.setup persists the onboarding mnemonic as the
encrypted seed backup (reveal previously failed on EVERY node — nothing
ever wrote master_seed.enc); seed.restore stashes too; error sanitizer
lets seed/2FA errors through instead of "Check server logs"
- lnd: bitcoind.rpchost resolved from the running Bitcoin variant
(hardcoded bitcoin-knots broke Core nodes); manifest uses derived_env
- bitcoin status: clean human message for connection-reset/startup; raw
URLs + os-error chains no longer reach the app card
- fedimint-clientd: chown /var/lib/archipelago/fmcd to 1000:1000 (root-
created dir crash-looped the rootless container, EACCES) — first-boot
script + pre-start self-heal
- log volume (>1GB/day on a day-old node): journald caps drop-in (ISO +
bootstrap self-heal), bitcoind -printtoconsole=0 everywhere (90% of the
journal was IBD UpdateTip spam), tracing default debug→info
Frontend:
- Login: Enter advances to confirm field then submits; submit always
clickable with inline errors (was silently disabled on mismatch);
Restart Onboarding needs a confirming second click (the mismatch →
"onboarding restarted" trap)
- sync store: 30s state reconciliation + refetch on re-entrant connect;
20s containers-scanned escape hatch so Checking can never show forever;
fresh empty node reaches the real "no apps yet" state
- intro video: CRF20 re-encode (SSIM 0.988) + faststart — moov was at EOF
so playback needed the full 15MB first (the intro lag)
- backgrounds: 10 heaviest JPEGs → WebP q90 (9.4MB→6.6MB); 7 stayed JPEG
(WebP larger on noisy sources)
- Web5ConnectedNodes: drop unused template ref that failed vue-tsc -b
ISO/kiosk:
- nginx: /assets/ 404s no longer cached immutable for a year; HTTPS block
gained the missing /assets/ location (served index.html as images)
- kiosk: launcher/service spliced from configs/ at ISO build (stale
heredoc force-disabled GPU); MemoryHigh/Max 1200/1500→2200/2800M (kiosk
rode the reclaim throttle = the lag); firmware-intel-graphics +
firmware-amd-graphics (trixie split DMC blobs out of misc-nonfree)
Verified: cargo test 898/898 green, npm run build green with dist
contents confirmed (webp refs, lnd.png, faststart video, new strings).
Handover for ISO build + deploy: docs/HANDOVER-2026-07-02-iso-feedback.md
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 08:00:39 -04:00
|
|
|
// Subscribe BEFORE taking the initial snapshot. Messages are full
|
|
|
|
|
// data dumps keyed by a monotonic revision, so a broadcast that
|
|
|
|
|
// races the snapshot is at worst a harmless duplicate/newer dump
|
|
|
|
|
// delivered right after — but subscribing after the snapshot send
|
|
|
|
|
// (the old order) let any update in that window vanish forever,
|
|
|
|
|
// since a tokio broadcast channel never delivers sends that
|
|
|
|
|
// predate subscribe(). That silently stuck clients (e.g. a fresh
|
|
|
|
|
// install's post-boot container scan) on a stale initial snapshot
|
|
|
|
|
// until a full page reload opened a new connection past the race.
|
|
|
|
|
let mut state_rx = state_manager.subscribe();
|
|
|
|
|
|
2026-03-22 03:30:21 +00:00
|
|
|
let initial_msg = state_manager.get_initial_message().await;
|
|
|
|
|
if let Ok(json_msg) = serde_json::to_string(&initial_msg) {
|
|
|
|
|
if let Err(e) = tx.send(Message::Text(json_msg)).await {
|
|
|
|
|
debug!("Failed to send initial data: {}", e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
debug!("Sent initial data dump at revision {}", initial_msg.rev);
|
|
|
|
|
}
|
|
|
|
|
let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
|
|
|
|
|
tokio::pin!(ping_interval);
|
|
|
|
|
let mut last_client_activity = Instant::now();
|
|
|
|
|
const INACTIVITY_TIMEOUT_SECS: u64 = 300; // 5 minutes
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
tokio::select! {
|
|
|
|
|
_ = ping_interval.tick() => {
|
|
|
|
|
// Check inactivity timeout
|
|
|
|
|
if last_client_activity.elapsed().as_secs() >= INACTIVITY_TIMEOUT_SECS {
|
|
|
|
|
info!("WebSocket client inactive for {}s, closing", INACTIVITY_TIMEOUT_SECS);
|
|
|
|
|
let _ = tx.send(Message::Close(None)).await;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if tx.send(Message::Ping(vec![])).await.is_err() {
|
|
|
|
|
debug!("Failed to send ping, connection likely closed");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
update = state_rx.recv() => {
|
|
|
|
|
match update {
|
|
|
|
|
Ok(msg) => {
|
|
|
|
|
if let Ok(json_msg) = serde_json::to_string(&msg) {
|
|
|
|
|
if let Err(e) = tx.send(Message::Text(json_msg)).await {
|
|
|
|
|
debug!("Failed to send state update: {}", e);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
debug!("Sent state update at revision {}", msg.rev);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
|
|
|
|
debug!("Client lagged behind, skipped {} messages", skipped);
|
|
|
|
|
}
|
|
|
|
|
Err(broadcast::error::RecvError::Closed) => {
|
|
|
|
|
debug!("Broadcast channel closed");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
msg = rx.next() => {
|
|
|
|
|
match msg {
|
|
|
|
|
Some(Ok(Message::Close(_))) => break,
|
|
|
|
|
Some(Ok(Message::Pong(_))) => {
|
|
|
|
|
last_client_activity = Instant::now();
|
|
|
|
|
debug!("Received pong");
|
|
|
|
|
}
|
|
|
|
|
Some(Ok(Message::Ping(data))) => {
|
|
|
|
|
last_client_activity = Instant::now();
|
|
|
|
|
let _ = tx.send(Message::Pong(data)).await;
|
|
|
|
|
}
|
|
|
|
|
Some(Ok(Message::Text(text))) => {
|
|
|
|
|
last_client_activity = Instant::now();
|
|
|
|
|
// Handle JSON ping from frontend
|
|
|
|
|
if text.contains("\"type\":\"ping\"") || text.contains("\"type\": \"ping\"") {
|
|
|
|
|
let _ = tx.send(Message::Text(r#"{"type":"pong"}"#.to_string())).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Some(Ok(_)) => {
|
|
|
|
|
last_client_activity = Instant::now();
|
|
|
|
|
}
|
|
|
|
|
Some(Err(e)) => {
|
|
|
|
|
debug!("WebSocket stream error: {}", e);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
None => break,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
metrics_store.decrement_ws();
|
|
|
|
|
info!("WebSocket /ws/db disconnected");
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(response)
|
|
|
|
|
}
|
|
|
|
|
}
|