From 6ddce90e4529968f0414921130fcb4ef4a04b969 Mon Sep 17 00:00:00 2001 From: Dorian Date: Sun, 19 Apr 2026 01:58:21 -0400 Subject: [PATCH] feat(federation): transitive peer learning via state-sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Alice syncs state with a Trusted peer Bob, she now learns about Bob's other Trusted peers and auto-adds them as Observers on her side — so Carol's fips_npub is known locally and subsequent federation traffic to Carol can route directly over FIPS without a separate invite round-trip. - NodeStateSnapshot gains a `federated_peers: Vec` field (serde default for backward compat with v1.4 snapshots). - FederationPeerHint is a minimal projection: did, pubkey, onion, name, fips_npub — excludes per-receiver fields (trust_level, added_at, last_seen, last_state). - build_local_state takes the local federation list and includes only Trusted peers. Observer/Untrusted peers are NOT re-exported — a node shouldn't launder other people's federation through its own authority. - sync_with_peer merges the received hints via merge_transitive_peers when the source is Trusted: existing entries get fips_npub refreshed if missing; unknown DIDs are added at Observer trust (never auto-promoted to Trusted). - Bounded to 1 hop: merged Observer entries do NOT get re-exported in the local node's own snapshots. So Bob → Alice learns Carol, but Alice's snapshots to Dave do not include Carol. - Tests: round-trip + filter-non-trusted-from-snapshot coverage. - Storage + delta test fixtures updated for the new field. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/api/rpc/federation/handlers.rs | 9 ++ core/archipelago/src/federation/storage.rs | 1 + core/archipelago/src/federation/sync.rs | 149 +++++++++++++++++- core/archipelago/src/federation/types.rs | 21 +++ core/archipelago/src/transport/delta.rs | 2 + 5 files changed, 181 insertions(+), 1 deletion(-) diff --git a/core/archipelago/src/api/rpc/federation/handlers.rs b/core/archipelago/src/api/rpc/federation/handlers.rs index d57ad46a..baef7b6a 100644 --- a/core/archipelago/src/api/rpc/federation/handlers.rs +++ b/core/archipelago/src/api/rpc/federation/handlers.rs @@ -379,6 +379,14 @@ impl RpcHandler { .and_then(|hex| nostr_sdk::PublicKey::from_hex(&hex).ok()) .and_then(|pk| nostr_sdk::ToBech32::to_bech32(&pk).ok()); + // Pass the current federated-peer list so the snapshot can include + // a `federated_peers` hint for transitive federation — receivers + // who trust us learn our Trusted peers and can route to them + // over FIPS without a separate invite round-trip. + let federated_peers = federation::load_nodes(&self.config.data_dir) + .await + .unwrap_or_default(); + let state = federation::build_local_state( apps, 0.0, @@ -390,6 +398,7 @@ impl RpcHandler { tor_active, server_name, nostr_npub, + &federated_peers, ); Ok(serde_json::to_value(&state)?) diff --git a/core/archipelago/src/federation/storage.rs b/core/archipelago/src/federation/storage.rs index 056dbe50..dc0eded3 100644 --- a/core/archipelago/src/federation/storage.rs +++ b/core/archipelago/src/federation/storage.rs @@ -275,6 +275,7 @@ mod tests { uptime_secs: Some(86400), tor_active: Some(true), nostr_npub: None, + federated_peers: Vec::new(), }; update_node_state(dir.path(), "did:key:z1", state) diff --git a/core/archipelago/src/federation/sync.rs b/core/archipelago/src/federation/sync.rs index a26283aa..cdcf8972 100644 --- a/core/archipelago/src/federation/sync.rs +++ b/core/archipelago/src/federation/sync.rs @@ -8,7 +8,9 @@ use anyhow::{Context, Result}; use std::path::Path; use super::storage::update_node_state; -use super::types::{AppStatus, FederatedNode, NodeStateSnapshot, TrustLevel}; +use super::types::{ + AppStatus, FederatedNode, FederationPeerHint, NodeStateSnapshot, TrustLevel, +}; use crate::fips::dial::PeerRequest; /// Sync state with a single federated peer. Tries FIPS first; falls back @@ -51,10 +53,90 @@ pub async fn sync_with_peer( update_node_state(data_dir, &peer.did, state.clone()).await?; + // Transitive federation: merge in peers our (Trusted) source advertised + // so we can route directly to them over FIPS without a second invite + // hop. Only runs when the source is Trusted — Observer-level peers + // don't get to expand our federation on their own authority. + if peer.trust_level == TrustLevel::Trusted { + if let Err(e) = + merge_transitive_peers(data_dir, &peer.did, &state.federated_peers).await + { + tracing::warn!( + peer_did = %peer.did, + error = %e, + "Transitive federation merge failed (non-fatal)" + ); + } + } + Ok(state) } +/// Merge peers advertised by a Trusted federated node into our own +/// federation list. New peers are added at `Observer` trust (not +/// Trusted — that requires a direct invite). Existing peers get their +/// `fips_npub` refreshed if we hadn't learned it yet. +/// +/// Peers we are (us) or that we already track by DID are skipped. +async fn merge_transitive_peers( + data_dir: &std::path::Path, + source_did: &str, + hints: &[FederationPeerHint], +) -> Result<()> { + if hints.is_empty() { + return Ok(()); + } + let mut nodes = super::storage::load_nodes(data_dir).await?; + let mut added = 0u32; + let mut refreshed = 0u32; + + for hint in hints { + // Don't import our own DID (a peer advertising us back). + if hint.did == source_did { + continue; + } + if let Some(existing) = nodes.iter_mut().find(|n| n.did == hint.did) { + // Already known — just refresh fips_npub if we didn't have one. + if existing.fips_npub.is_none() && hint.fips_npub.is_some() { + existing.fips_npub = hint.fips_npub.clone(); + refreshed += 1; + } + continue; + } + nodes.push(FederatedNode { + did: hint.did.clone(), + pubkey: hint.pubkey.clone(), + onion: hint.onion.clone(), + name: hint.name.clone(), + trust_level: TrustLevel::Observer, + added_at: chrono::Utc::now().to_rfc3339(), + last_seen: None, + last_state: None, + fips_npub: hint.fips_npub.clone(), + }); + added += 1; + } + + if added > 0 || refreshed > 0 { + super::storage::save_nodes(data_dir, &nodes).await?; + tracing::info!( + source_did = %source_did, + added, + refreshed, + "Transitive federation merge complete" + ); + } + Ok(()) +} + /// Build the local node's state snapshot for sharing with peers. +/// +/// `federated_peers` should be the caller's full list of federated +/// nodes; `build_local_state` filters them down to a `FederationPeerHint` +/// so receivers can perform transitive pairing (learn peers-of-peers +/// and route directly over FIPS from now on). Only peers we trust are +/// shared — an Untrusted/Observer node should not be re-exported +/// through us to the network. pub fn build_local_state( apps: Vec, cpu: f64, @@ -66,7 +148,20 @@ pub fn build_local_state( tor_active: bool, server_name: Option, nostr_npub: Option, + federated_peers: &[FederatedNode], ) -> NodeStateSnapshot { + let hints = federated_peers + .iter() + .filter(|n| n.trust_level == TrustLevel::Trusted) + .map(|n| FederationPeerHint { + did: n.did.clone(), + pubkey: n.pubkey.clone(), + onion: n.onion.clone(), + name: n.name.clone(), + fips_npub: n.fips_npub.clone(), + }) + .collect(); + NodeStateSnapshot { timestamp: chrono::Utc::now().to_rfc3339(), node_name: server_name, @@ -79,6 +174,7 @@ pub fn build_local_state( uptime_secs: Some(uptime), tor_active: Some(tor_active), nostr_npub, + federated_peers: hints, } } @@ -166,10 +262,61 @@ mod tests { true, Some("Test Node".to_string()), None, + &[], ); assert_eq!(state.apps.len(), 1); assert_eq!(state.cpu_usage_percent, Some(25.5)); assert_eq!(state.tor_active, Some(true)); assert_eq!(state.node_name, Some("Test Node".to_string())); + assert!(state.federated_peers.is_empty()); + } + + #[test] + fn build_local_state_filters_non_trusted_peers() { + let peers = vec![ + FederatedNode { + did: "did:key:zTrusted".into(), + pubkey: "aa".into(), + onion: "t.onion".into(), + name: None, + trust_level: TrustLevel::Trusted, + added_at: "now".into(), + last_seen: None, + last_state: None, + fips_npub: Some("npub1a".into()), + }, + FederatedNode { + did: "did:key:zObserver".into(), + pubkey: "bb".into(), + onion: "o.onion".into(), + name: None, + trust_level: TrustLevel::Observer, + added_at: "now".into(), + last_seen: None, + last_state: None, + fips_npub: Some("npub1b".into()), + }, + FederatedNode { + did: "did:key:zUntrusted".into(), + pubkey: "cc".into(), + onion: "u.onion".into(), + name: None, + trust_level: TrustLevel::Untrusted, + added_at: "now".into(), + last_seen: None, + last_state: None, + fips_npub: None, + }, + ]; + let state = build_local_state( + vec![], + 0.0, 0, 0, 0, 0, 0, true, None, None, &peers, + ); + assert_eq!(state.federated_peers.len(), 1); + assert_eq!(state.federated_peers[0].did, "did:key:zTrusted"); + assert_eq!( + state.federated_peers[0].fips_npub.as_deref(), + Some("npub1a") + ); } } diff --git a/core/archipelago/src/federation/types.rs b/core/archipelago/src/federation/types.rs index 788bd246..5aef877f 100644 --- a/core/archipelago/src/federation/types.rs +++ b/core/archipelago/src/federation/types.rs @@ -68,6 +68,27 @@ pub struct NodeStateSnapshot { /// haven't synced after this field was added will report None. #[serde(default)] pub nostr_npub: Option, + /// Minimal summary of peers this node trusts, used for transitive + /// federation: when Alice syncs with Bob, she learns Bob's trusted + /// peers and adds them as Observers on her side so `fips_npub` is + /// known and future state-syncs can route directly. Bounded to one + /// hop (Alice doesn't auto-promote Observer-via-Bob to Trusted nor + /// re-export them in her own state snapshots). + #[serde(default)] + pub federated_peers: Vec, +} + +/// Minimal peer summary shared via `NodeStateSnapshot.federated_peers`. +/// Excludes sensitive/per-receiver fields like trust_level and added_at. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FederationPeerHint { + pub did: String, + pub pubkey: String, + pub onion: String, + #[serde(default)] + pub name: Option, + #[serde(default)] + pub fips_npub: Option, } /// Status of a single app/container on a remote node. diff --git a/core/archipelago/src/transport/delta.rs b/core/archipelago/src/transport/delta.rs index a89aee67..a99c910d 100644 --- a/core/archipelago/src/transport/delta.rs +++ b/core/archipelago/src/transport/delta.rs @@ -223,6 +223,7 @@ mod tests { uptime_secs: Some(86400), tor_active: Some(true), nostr_npub: None, + federated_peers: Vec::new(), } } @@ -255,6 +256,7 @@ mod tests { uptime_secs: Some(86700), // Changed tor_active: Some(true), nostr_npub: None, + federated_peers: Vec::new(), } }