refactor: split remote relay into own module, add lifecycle reconnect
- Move handle_remote_relay from remote_input.rs to remote_relay.rs - Android: lifecycle-aware WebSocket reconnection on app resume - Cleaner module boundaries between xdotool input and browser relay Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
7409cdaac2
commit
1690f67acf
@ -15,7 +15,6 @@ import androidx.compose.foundation.layout.windowInsetsPadding
|
||||
import androidx.compose.foundation.shape.CircleShape
|
||||
import androidx.compose.runtime.Composable
|
||||
import androidx.compose.runtime.DisposableEffect
|
||||
import androidx.compose.runtime.LaunchedEffect
|
||||
import androidx.compose.runtime.collectAsState
|
||||
import androidx.compose.runtime.getValue
|
||||
import androidx.compose.runtime.mutableStateOf
|
||||
@ -28,6 +27,9 @@ import androidx.compose.ui.draw.clip
|
||||
import androidx.compose.ui.graphics.Color
|
||||
import androidx.compose.ui.platform.LocalConfiguration
|
||||
import androidx.compose.ui.platform.LocalContext
|
||||
import androidx.compose.ui.platform.LocalLifecycleOwner
|
||||
import androidx.lifecycle.Lifecycle
|
||||
import androidx.lifecycle.LifecycleEventObserver
|
||||
import androidx.compose.ui.unit.dp
|
||||
import com.archipelago.app.data.ServerPreferences
|
||||
import com.archipelago.app.network.ConnectionState
|
||||
@ -60,10 +62,32 @@ fun RemoteInputScreen(onBack: () -> Unit) {
|
||||
|
||||
val ws = remember { InputWebSocket(scope) }
|
||||
val connectionState by ws.state.collectAsState()
|
||||
val lifecycleOwner = LocalLifecycleOwner.current
|
||||
|
||||
BackHandler { onBack() }
|
||||
DisposableEffect(Unit) { onDispose { ws.disconnect() } }
|
||||
LaunchedEffect(activeServer) { activeServer?.let { ws.connect(it.toUrl(), it.password) } }
|
||||
|
||||
// Connect on server change + reconnect when app resumes from background
|
||||
DisposableEffect(lifecycleOwner, activeServer) {
|
||||
val server = activeServer
|
||||
if (server != null) {
|
||||
ws.connect(server.toUrl(), server.password)
|
||||
}
|
||||
|
||||
val observer = LifecycleEventObserver { _, event ->
|
||||
if (event == Lifecycle.Event.ON_RESUME && server != null) {
|
||||
val state = ws.state.value
|
||||
if (state != ConnectionState.CONNECTED && state != ConnectionState.CONNECTING) {
|
||||
ws.connect(server.toUrl(), server.password)
|
||||
}
|
||||
}
|
||||
}
|
||||
lifecycleOwner.lifecycle.addObserver(observer)
|
||||
|
||||
onDispose {
|
||||
lifecycleOwner.lifecycle.removeObserver(observer)
|
||||
ws.disconnect()
|
||||
}
|
||||
}
|
||||
|
||||
Box(
|
||||
Modifier
|
||||
|
||||
@ -3,6 +3,7 @@ mod dwn;
|
||||
mod node_message;
|
||||
mod proxy;
|
||||
mod remote_input;
|
||||
mod remote_relay;
|
||||
mod websocket;
|
||||
|
||||
use crate::api::rpc::RpcHandler;
|
||||
|
||||
@ -226,87 +226,4 @@ impl ApiHandler {
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Browser relay — receives input events from the broadcast channel and forwards to the browser.
|
||||
pub(super) async fn handle_remote_relay(
|
||||
req: Request<hyper::Body>,
|
||||
mut relay_rx: broadcast::Receiver<String>,
|
||||
) -> Result<Response<hyper::Body>> {
|
||||
let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req)
|
||||
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
|
||||
|
||||
if let Some(ws_fut) = ws_fut_opt {
|
||||
tokio::spawn(async move {
|
||||
let ws_stream: WsStream = match ws_fut.await {
|
||||
Ok(Ok(s)) => s,
|
||||
Ok(Err(e)) => {
|
||||
debug!("Remote relay WS handshake failed: {}", e);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Remote relay WS task join failed: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
info!("Remote relay browser connected");
|
||||
let (mut tx, mut rx) = ws_stream.split();
|
||||
|
||||
let _ = tx.send(Message::Text(r#"{"t":"ok"}"#.to_string())).await;
|
||||
|
||||
let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
|
||||
tokio::pin!(ping_interval);
|
||||
let mut last_activity = Instant::now();
|
||||
const INACTIVITY_TIMEOUT: u64 = 300;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ping_interval.tick() => {
|
||||
if last_activity.elapsed().as_secs() >= INACTIVITY_TIMEOUT {
|
||||
info!("Remote relay inactive, closing");
|
||||
let _ = tx.send(Message::Close(None)).await;
|
||||
break;
|
||||
}
|
||||
if tx.send(Message::Ping(vec![])).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
input = relay_rx.recv() => {
|
||||
match input {
|
||||
Ok(text) => {
|
||||
if tx.send(Message::Text(text)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
debug!("Remote relay lagged, skipped {} messages", n);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
msg = rx.next() => {
|
||||
match msg {
|
||||
Some(Ok(Message::Pong(_))) | Some(Ok(Message::Text(_))) => {
|
||||
last_activity = Instant::now();
|
||||
}
|
||||
Some(Ok(Message::Ping(data))) => {
|
||||
last_activity = Instant::now();
|
||||
let _ = tx.send(Message::Pong(data)).await;
|
||||
}
|
||||
Some(Ok(Message::Close(_))) | None => break,
|
||||
Some(Ok(_)) => { last_activity = Instant::now(); }
|
||||
Some(Err(e)) => {
|
||||
debug!("Remote relay stream error: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Remote relay browser disconnected");
|
||||
});
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
83
core/archipelago/src/api/handler/remote_relay.rs
Normal file
83
core/archipelago/src/api/handler/remote_relay.rs
Normal file
@ -0,0 +1,83 @@
|
||||
use anyhow::Result;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use hyper::{Request, Response};
|
||||
use hyper_ws_listener::WsStream;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use super::ApiHandler;
|
||||
|
||||
impl ApiHandler {
|
||||
/// WebSocket endpoint for browser clients to receive relayed companion input.
|
||||
/// The browser's remote-relay.ts dispatches these as DOM keyboard/mouse events.
|
||||
pub(super) async fn handle_remote_relay(
|
||||
req: Request<hyper::Body>,
|
||||
mut relay_rx: broadcast::Receiver<String>,
|
||||
) -> Result<Response<hyper::Body>> {
|
||||
let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req)
|
||||
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
|
||||
|
||||
if let Some(ws_fut) = ws_fut_opt {
|
||||
tokio::spawn(async move {
|
||||
let ws_stream: WsStream = match ws_fut.await {
|
||||
Ok(Ok(s)) => s,
|
||||
Ok(Err(e)) => {
|
||||
debug!("Remote relay WS handshake failed: {}", e);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Remote relay WS task join failed: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
info!("Remote relay client connected");
|
||||
|
||||
let (mut tx, mut rx) = ws_stream.split();
|
||||
|
||||
// Send ready message
|
||||
let _ = tx.send(Message::Text(r#"{"t":"ok"}"#.to_string())).await;
|
||||
|
||||
let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
|
||||
tokio::pin!(ping_interval);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ping_interval.tick() => {
|
||||
if tx.send(Message::Ping(vec![])).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Forward relayed input from companion app
|
||||
msg = relay_rx.recv() => {
|
||||
match msg {
|
||||
Ok(text) => {
|
||||
if tx.send(Message::Text(text)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
debug!("Remote relay lagged, dropped {} messages", n);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
// Handle client-side messages (pong, close)
|
||||
client_msg = rx.next() => {
|
||||
match client_msg {
|
||||
Some(Ok(Message::Pong(_))) | Some(Ok(Message::Ping(_))) => {}
|
||||
Some(Ok(Message::Close(_))) | None => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Remote relay client disconnected");
|
||||
});
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user