feat(federation): transitive peer learning via state-sync
When Alice syncs state with a Trusted peer Bob, she now learns about Bob's other Trusted peers and auto-adds them as Observers on her side — so Carol's fips_npub is known locally and subsequent federation traffic to Carol can route directly over FIPS without a separate invite round-trip. - NodeStateSnapshot gains a `federated_peers: Vec<FederationPeerHint>` field (serde default for backward compat with v1.4 snapshots). - FederationPeerHint is a minimal projection: did, pubkey, onion, name, fips_npub — excludes per-receiver fields (trust_level, added_at, last_seen, last_state). - build_local_state takes the local federation list and includes only Trusted peers. Observer/Untrusted peers are NOT re-exported — a node shouldn't launder other people's federation through its own authority. - sync_with_peer merges the received hints via merge_transitive_peers when the source is Trusted: existing entries get fips_npub refreshed if missing; unknown DIDs are added at Observer trust (never auto-promoted to Trusted). - Bounded to 1 hop: merged Observer entries do NOT get re-exported in the local node's own snapshots. So Bob → Alice learns Carol, but Alice's snapshots to Dave do not include Carol. - Tests: round-trip + filter-non-trusted-from-snapshot coverage. - Storage + delta test fixtures updated for the new field. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
bcd9b9aa56
commit
4d8c8c89a2
@ -379,6 +379,14 @@ impl RpcHandler {
|
|||||||
.and_then(|hex| nostr_sdk::PublicKey::from_hex(&hex).ok())
|
.and_then(|hex| nostr_sdk::PublicKey::from_hex(&hex).ok())
|
||||||
.and_then(|pk| nostr_sdk::ToBech32::to_bech32(&pk).ok());
|
.and_then(|pk| nostr_sdk::ToBech32::to_bech32(&pk).ok());
|
||||||
|
|
||||||
|
// Pass the current federated-peer list so the snapshot can include
|
||||||
|
// a `federated_peers` hint for transitive federation — receivers
|
||||||
|
// who trust us learn our Trusted peers and can route to them
|
||||||
|
// over FIPS without a separate invite round-trip.
|
||||||
|
let federated_peers = federation::load_nodes(&self.config.data_dir)
|
||||||
|
.await
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let state = federation::build_local_state(
|
let state = federation::build_local_state(
|
||||||
apps,
|
apps,
|
||||||
0.0,
|
0.0,
|
||||||
@ -390,6 +398,7 @@ impl RpcHandler {
|
|||||||
tor_active,
|
tor_active,
|
||||||
server_name,
|
server_name,
|
||||||
nostr_npub,
|
nostr_npub,
|
||||||
|
&federated_peers,
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(serde_json::to_value(&state)?)
|
Ok(serde_json::to_value(&state)?)
|
||||||
|
|||||||
@ -275,6 +275,7 @@ mod tests {
|
|||||||
uptime_secs: Some(86400),
|
uptime_secs: Some(86400),
|
||||||
tor_active: Some(true),
|
tor_active: Some(true),
|
||||||
nostr_npub: None,
|
nostr_npub: None,
|
||||||
|
federated_peers: Vec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
update_node_state(dir.path(), "did:key:z1", state)
|
update_node_state(dir.path(), "did:key:z1", state)
|
||||||
|
|||||||
@ -8,7 +8,9 @@ use anyhow::{Context, Result};
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
use super::storage::update_node_state;
|
use super::storage::update_node_state;
|
||||||
use super::types::{AppStatus, FederatedNode, NodeStateSnapshot, TrustLevel};
|
use super::types::{
|
||||||
|
AppStatus, FederatedNode, FederationPeerHint, NodeStateSnapshot, TrustLevel,
|
||||||
|
};
|
||||||
use crate::fips::dial::PeerRequest;
|
use crate::fips::dial::PeerRequest;
|
||||||
|
|
||||||
/// Sync state with a single federated peer. Tries FIPS first; falls back
|
/// Sync state with a single federated peer. Tries FIPS first; falls back
|
||||||
@ -51,10 +53,90 @@ pub async fn sync_with_peer(
|
|||||||
|
|
||||||
update_node_state(data_dir, &peer.did, state.clone()).await?;
|
update_node_state(data_dir, &peer.did, state.clone()).await?;
|
||||||
|
|
||||||
|
// Transitive federation: merge in peers our (Trusted) source advertised
|
||||||
|
// so we can route directly to them over FIPS without a second invite
|
||||||
|
// hop. Only runs when the source is Trusted — Observer-level peers
|
||||||
|
// don't get to expand our federation on their own authority.
|
||||||
|
if peer.trust_level == TrustLevel::Trusted {
|
||||||
|
if let Err(e) =
|
||||||
|
merge_transitive_peers(data_dir, &peer.did, &state.federated_peers).await
|
||||||
|
{
|
||||||
|
tracing::warn!(
|
||||||
|
peer_did = %peer.did,
|
||||||
|
error = %e,
|
||||||
|
"Transitive federation merge failed (non-fatal)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(state)
|
Ok(state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Merge peers advertised by a Trusted federated node into our own
|
||||||
|
/// federation list. New peers are added at `Observer` trust (not
|
||||||
|
/// Trusted — that requires a direct invite). Existing peers get their
|
||||||
|
/// `fips_npub` refreshed if we hadn't learned it yet.
|
||||||
|
///
|
||||||
|
/// Peers we are (us) or that we already track by DID are skipped.
|
||||||
|
async fn merge_transitive_peers(
|
||||||
|
data_dir: &std::path::Path,
|
||||||
|
source_did: &str,
|
||||||
|
hints: &[FederationPeerHint],
|
||||||
|
) -> Result<()> {
|
||||||
|
if hints.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let mut nodes = super::storage::load_nodes(data_dir).await?;
|
||||||
|
let mut added = 0u32;
|
||||||
|
let mut refreshed = 0u32;
|
||||||
|
|
||||||
|
for hint in hints {
|
||||||
|
// Don't import our own DID (a peer advertising us back).
|
||||||
|
if hint.did == source_did {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Some(existing) = nodes.iter_mut().find(|n| n.did == hint.did) {
|
||||||
|
// Already known — just refresh fips_npub if we didn't have one.
|
||||||
|
if existing.fips_npub.is_none() && hint.fips_npub.is_some() {
|
||||||
|
existing.fips_npub = hint.fips_npub.clone();
|
||||||
|
refreshed += 1;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
nodes.push(FederatedNode {
|
||||||
|
did: hint.did.clone(),
|
||||||
|
pubkey: hint.pubkey.clone(),
|
||||||
|
onion: hint.onion.clone(),
|
||||||
|
name: hint.name.clone(),
|
||||||
|
trust_level: TrustLevel::Observer,
|
||||||
|
added_at: chrono::Utc::now().to_rfc3339(),
|
||||||
|
last_seen: None,
|
||||||
|
last_state: None,
|
||||||
|
fips_npub: hint.fips_npub.clone(),
|
||||||
|
});
|
||||||
|
added += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if added > 0 || refreshed > 0 {
|
||||||
|
super::storage::save_nodes(data_dir, &nodes).await?;
|
||||||
|
tracing::info!(
|
||||||
|
source_did = %source_did,
|
||||||
|
added,
|
||||||
|
refreshed,
|
||||||
|
"Transitive federation merge complete"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Build the local node's state snapshot for sharing with peers.
|
/// Build the local node's state snapshot for sharing with peers.
|
||||||
|
///
|
||||||
|
/// `federated_peers` should be the caller's full list of federated
|
||||||
|
/// nodes; `build_local_state` filters them down to a `FederationPeerHint`
|
||||||
|
/// so receivers can perform transitive pairing (learn peers-of-peers
|
||||||
|
/// and route directly over FIPS from now on). Only peers we trust are
|
||||||
|
/// shared — an Untrusted/Observer node should not be re-exported
|
||||||
|
/// through us to the network.
|
||||||
pub fn build_local_state(
|
pub fn build_local_state(
|
||||||
apps: Vec<AppStatus>,
|
apps: Vec<AppStatus>,
|
||||||
cpu: f64,
|
cpu: f64,
|
||||||
@ -66,7 +148,20 @@ pub fn build_local_state(
|
|||||||
tor_active: bool,
|
tor_active: bool,
|
||||||
server_name: Option<String>,
|
server_name: Option<String>,
|
||||||
nostr_npub: Option<String>,
|
nostr_npub: Option<String>,
|
||||||
|
federated_peers: &[FederatedNode],
|
||||||
) -> NodeStateSnapshot {
|
) -> NodeStateSnapshot {
|
||||||
|
let hints = federated_peers
|
||||||
|
.iter()
|
||||||
|
.filter(|n| n.trust_level == TrustLevel::Trusted)
|
||||||
|
.map(|n| FederationPeerHint {
|
||||||
|
did: n.did.clone(),
|
||||||
|
pubkey: n.pubkey.clone(),
|
||||||
|
onion: n.onion.clone(),
|
||||||
|
name: n.name.clone(),
|
||||||
|
fips_npub: n.fips_npub.clone(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
NodeStateSnapshot {
|
NodeStateSnapshot {
|
||||||
timestamp: chrono::Utc::now().to_rfc3339(),
|
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||||
node_name: server_name,
|
node_name: server_name,
|
||||||
@ -79,6 +174,7 @@ pub fn build_local_state(
|
|||||||
uptime_secs: Some(uptime),
|
uptime_secs: Some(uptime),
|
||||||
tor_active: Some(tor_active),
|
tor_active: Some(tor_active),
|
||||||
nostr_npub,
|
nostr_npub,
|
||||||
|
federated_peers: hints,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,10 +262,61 @@ mod tests {
|
|||||||
true,
|
true,
|
||||||
Some("Test Node".to_string()),
|
Some("Test Node".to_string()),
|
||||||
None,
|
None,
|
||||||
|
&[],
|
||||||
);
|
);
|
||||||
assert_eq!(state.apps.len(), 1);
|
assert_eq!(state.apps.len(), 1);
|
||||||
assert_eq!(state.cpu_usage_percent, Some(25.5));
|
assert_eq!(state.cpu_usage_percent, Some(25.5));
|
||||||
assert_eq!(state.tor_active, Some(true));
|
assert_eq!(state.tor_active, Some(true));
|
||||||
assert_eq!(state.node_name, Some("Test Node".to_string()));
|
assert_eq!(state.node_name, Some("Test Node".to_string()));
|
||||||
|
assert!(state.federated_peers.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_local_state_filters_non_trusted_peers() {
|
||||||
|
let peers = vec![
|
||||||
|
FederatedNode {
|
||||||
|
did: "did:key:zTrusted".into(),
|
||||||
|
pubkey: "aa".into(),
|
||||||
|
onion: "t.onion".into(),
|
||||||
|
name: None,
|
||||||
|
trust_level: TrustLevel::Trusted,
|
||||||
|
added_at: "now".into(),
|
||||||
|
last_seen: None,
|
||||||
|
last_state: None,
|
||||||
|
fips_npub: Some("npub1a".into()),
|
||||||
|
},
|
||||||
|
FederatedNode {
|
||||||
|
did: "did:key:zObserver".into(),
|
||||||
|
pubkey: "bb".into(),
|
||||||
|
onion: "o.onion".into(),
|
||||||
|
name: None,
|
||||||
|
trust_level: TrustLevel::Observer,
|
||||||
|
added_at: "now".into(),
|
||||||
|
last_seen: None,
|
||||||
|
last_state: None,
|
||||||
|
fips_npub: Some("npub1b".into()),
|
||||||
|
},
|
||||||
|
FederatedNode {
|
||||||
|
did: "did:key:zUntrusted".into(),
|
||||||
|
pubkey: "cc".into(),
|
||||||
|
onion: "u.onion".into(),
|
||||||
|
name: None,
|
||||||
|
trust_level: TrustLevel::Untrusted,
|
||||||
|
added_at: "now".into(),
|
||||||
|
last_seen: None,
|
||||||
|
last_state: None,
|
||||||
|
fips_npub: None,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
let state = build_local_state(
|
||||||
|
vec![],
|
||||||
|
0.0, 0, 0, 0, 0, 0, true, None, None, &peers,
|
||||||
|
);
|
||||||
|
assert_eq!(state.federated_peers.len(), 1);
|
||||||
|
assert_eq!(state.federated_peers[0].did, "did:key:zTrusted");
|
||||||
|
assert_eq!(
|
||||||
|
state.federated_peers[0].fips_npub.as_deref(),
|
||||||
|
Some("npub1a")
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -68,6 +68,27 @@ pub struct NodeStateSnapshot {
|
|||||||
/// haven't synced after this field was added will report None.
|
/// haven't synced after this field was added will report None.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub nostr_npub: Option<String>,
|
pub nostr_npub: Option<String>,
|
||||||
|
/// Minimal summary of peers this node trusts, used for transitive
|
||||||
|
/// federation: when Alice syncs with Bob, she learns Bob's trusted
|
||||||
|
/// peers and adds them as Observers on her side so `fips_npub` is
|
||||||
|
/// known and future state-syncs can route directly. Bounded to one
|
||||||
|
/// hop (Alice doesn't auto-promote Observer-via-Bob to Trusted nor
|
||||||
|
/// re-export them in her own state snapshots).
|
||||||
|
#[serde(default)]
|
||||||
|
pub federated_peers: Vec<FederationPeerHint>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Minimal peer summary shared via `NodeStateSnapshot.federated_peers`.
|
||||||
|
/// Excludes sensitive/per-receiver fields like trust_level and added_at.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct FederationPeerHint {
|
||||||
|
pub did: String,
|
||||||
|
pub pubkey: String,
|
||||||
|
pub onion: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub name: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub fips_npub: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Status of a single app/container on a remote node.
|
/// Status of a single app/container on a remote node.
|
||||||
|
|||||||
@ -223,6 +223,7 @@ mod tests {
|
|||||||
uptime_secs: Some(86400),
|
uptime_secs: Some(86400),
|
||||||
tor_active: Some(true),
|
tor_active: Some(true),
|
||||||
nostr_npub: None,
|
nostr_npub: None,
|
||||||
|
federated_peers: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -255,6 +256,7 @@ mod tests {
|
|||||||
uptime_secs: Some(86700), // Changed
|
uptime_secs: Some(86700), // Changed
|
||||||
tor_active: Some(true),
|
tor_active: Some(true),
|
||||||
nostr_npub: None,
|
nostr_npub: None,
|
||||||
|
federated_peers: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user