From d55319205bc58ddb0b4d0c249c4382eccd311c43 Mon Sep 17 00:00:00 2001 From: Dorian Date: Fri, 13 Mar 2026 02:47:09 +0000 Subject: [PATCH] feat: fix DWN sync to use federation peers and standard port - DWN sync now uses federation node list instead of old peer list - Fix sync URL to use port 80 (nginx) instead of 5678 (direct backend) - DWN /dwn endpoint now accessible without auth for peer sync - Support both message formats: {message:{}} and {messages:[{}]} - Replace request["message"] with unified message variable Co-Authored-By: Claude Opus 4.6 --- core/archipelago/src/api/handler.rs | 44 ++++++++++++++---------- core/archipelago/src/api/rpc/dwn.rs | 10 +++--- core/archipelago/src/network/dwn_sync.rs | 2 +- loop/plan.md | 2 +- 4 files changed, 32 insertions(+), 26 deletions(-) diff --git a/core/archipelago/src/api/handler.rs b/core/archipelago/src/api/handler.rs index 0982126d..7f3ff4f8 100644 --- a/core/archipelago/src/api/handler.rs +++ b/core/archipelago/src/api/handler.rs @@ -183,11 +183,8 @@ impl ApiHandler { Self::handle_dwn_health(&self.config).await } - // DWN message processing — authenticated + // DWN message processing — peers access over Tor for sync (no session auth) (Method::POST, "/dwn") => { - if !self.is_authenticated(&headers).await { - return Ok(Self::unauthorized()); - } Self::handle_dwn_message(body_bytes, &self.config).await } @@ -671,10 +668,19 @@ impl ApiHandler { } }; - let interface = request["message"]["descriptor"]["interface"] + // Support both formats: {"message": {...}} and {"messages": [{...}]} + let message = if request.get("message").is_some() { + request["message"].clone() + } else if let Some(msgs) = request["messages"].as_array() { + msgs.first().cloned().unwrap_or_default() + } else { + serde_json::Value::Null + }; + + let interface = message["descriptor"]["interface"] .as_str() .unwrap_or(""); - let method = request["message"]["descriptor"]["method"] + let method = message["descriptor"]["method"] .as_str() .unwrap_or(""); @@ -682,11 +688,11 @@ impl ApiHandler { let result = match (interface, method) { ("Records", "Write") => { - let author = request["message"]["author"].as_str().unwrap_or("unknown"); - let protocol = request["message"]["descriptor"]["protocol"].as_str(); - let schema = request["message"]["descriptor"]["schema"].as_str(); - let data_format = request["message"]["descriptor"]["dataFormat"].as_str(); - let data = request["message"].get("data").cloned(); + let author = message["author"].as_str().unwrap_or("unknown"); + let protocol = message["descriptor"]["protocol"].as_str(); + let schema = message["descriptor"]["schema"].as_str(); + let data_format = message["descriptor"]["dataFormat"].as_str(); + let data = message.get("data").cloned(); match store.write_message(author, protocol, schema, data_format, data).await { Ok(msg) => serde_json::json!({"status": {"code": 202}, "entry": msg}), Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}), @@ -694,22 +700,22 @@ impl ApiHandler { } ("Records", "Query") => { let query = crate::network::dwn_store::MessageQuery { - protocol: request["message"]["descriptor"]["filter"]["protocol"] + protocol: message["descriptor"]["filter"]["protocol"] .as_str() .map(|s| s.to_string()), - schema: request["message"]["descriptor"]["filter"]["schema"] + schema: message["descriptor"]["filter"]["schema"] .as_str() .map(|s| s.to_string()), - author: request["message"]["descriptor"]["filter"]["author"] + author: message["descriptor"]["filter"]["author"] .as_str() .map(|s| s.to_string()), - date_from: request["message"]["descriptor"]["filter"]["dateFrom"] + date_from: message["descriptor"]["filter"]["dateFrom"] .as_str() .map(|s| s.to_string()), - date_to: request["message"]["descriptor"]["filter"]["dateTo"] + date_to: message["descriptor"]["filter"]["dateTo"] .as_str() .map(|s| s.to_string()), - limit: request["message"]["descriptor"]["filter"]["limit"] + limit: message["descriptor"]["filter"]["limit"] .as_u64() .map(|n| n as usize), }; @@ -719,7 +725,7 @@ impl ApiHandler { } } ("Records", "Read") => { - let record_id = request["message"]["descriptor"]["recordId"] + let record_id = message["descriptor"]["recordId"] .as_str() .unwrap_or(""); match store.read_message(record_id).await { @@ -729,7 +735,7 @@ impl ApiHandler { } } ("Records", "Delete") => { - let record_id = request["message"]["descriptor"]["recordId"] + let record_id = message["descriptor"]["recordId"] .as_str() .unwrap_or(""); match store.delete_message(record_id).await { diff --git a/core/archipelago/src/api/rpc/dwn.rs b/core/archipelago/src/api/rpc/dwn.rs index f7d1acd5..11ee6b86 100644 --- a/core/archipelago/src/api/rpc/dwn.rs +++ b/core/archipelago/src/api/rpc/dwn.rs @@ -1,7 +1,7 @@ use super::RpcHandler; +use crate::federation; use crate::network::dwn_store::{DwnStore, MessageQuery, ProtocolDefinition}; use crate::network::dwn_sync; -use crate::peers; use anyhow::Result; impl RpcHandler { @@ -32,11 +32,11 @@ impl RpcHandler { /// Trigger DWN sync with connected peers. pub(super) async fn handle_dwn_sync(&self) -> Result { - let peer_list = peers::load_peers(&self.config.data_dir).await?; - let onions: Vec = peer_list + let nodes = federation::load_nodes(&self.config.data_dir).await?; + let onions: Vec = nodes .iter() - .filter(|p| !p.onion.is_empty()) - .map(|p| p.onion.clone()) + .filter(|n| !n.onion.is_empty() && n.trust_level != federation::TrustLevel::Untrusted) + .map(|n| n.onion.clone()) .collect(); let state = dwn_sync::sync_with_peers(&self.config.data_dir, &onions).await?; diff --git a/core/archipelago/src/network/dwn_sync.rs b/core/archipelago/src/network/dwn_sync.rs index b8290d6b..43b08ae7 100644 --- a/core/archipelago/src/network/dwn_sync.rs +++ b/core/archipelago/src/network/dwn_sync.rs @@ -156,7 +156,7 @@ async fn sync_single_peer( local_messages: &[crate::network::dwn_store::DwnMessage], last_sync: &Option, ) -> Result { - let base_url = format!("http://{}:5678", onion); + let base_url = format!("http://{}", onion); let mut imported = 0u64; // Step 1: Check peer health diff --git a/loop/plan.md b/loop/plan.md index 31e66eca..a6268a21 100644 --- a/loop/plan.md +++ b/loop/plan.md @@ -522,7 +522,7 @@ ### Sprint 45: DWN Multi-Node Sync (June 2026 Week 3-4) -- [ ] **DWN-SYNC-01** — Test DWN sync between federated nodes. On node A: register a protocol via `dwn.register-protocol` (e.g., `https://archipelago.dev/protocols/notes`), write 5 messages via `dwn.write-message`. On node B: add node A as a sync target (the DWN sync module uses the federation peer list), trigger `dwn.sync`. Verify all 5 messages appear on node B via `dwn.query-messages`. Write 3 messages on node B, trigger sync from node A — verify bidirectional replication. **Acceptance**: Messages replicate both ways between 2 nodes. Protocol definitions sync as well. +- [x] **DWN-SYNC-01** — Test DWN sync between federated nodes. On node A: register a protocol via `dwn.register-protocol` (e.g., `https://archipelago.dev/protocols/notes`), write 5 messages via `dwn.write-message`. On node B: add node A as a sync target (the DWN sync module uses the federation peer list), trigger `dwn.sync`. Verify all 5 messages appear on node B via `dwn.query-messages`. Write 3 messages on node B, trigger sync from node A — verify bidirectional replication. **Acceptance**: Messages replicate both ways between 2 nodes. Protocol definitions sync as well. - [ ] **DWN-SYNC-02** — Test DWN sync across all 4 nodes. Register the same protocol on all 4 nodes. Write unique messages on each node (node A writes 5, B writes 3, C writes 2, D writes 4 = 14 total). Trigger sync from each node. After sync completes, query all messages on each node — every node should have all 14 messages. If sync is missing messages: check the bidirectional replication logic in `dwn_sync.rs`, ensure Tor SOCKS proxy is used correctly, check for deduplication issues. **Acceptance**: All 4 nodes have all 14 messages after sync. Message content and metadata intact.