use super::RpcHandler; use crate::federation; use crate::network::dwn_store::{DwnStore, MessageQuery, ProtocolDefinition}; use crate::network::dwn_sync; use anyhow::Result; impl RpcHandler { /// Get DWN status and sync state. pub(super) async fn handle_dwn_status(&self) -> Result { let sync_state = dwn_sync::load_sync_state(&self.config.data_dir).await?; let server_status = dwn_sync::get_dwn_status().await.unwrap_or(dwn_sync::DwnStatusResponse { running: false, version: String::new(), }); let store = DwnStore::new(&self.config.data_dir).await?; let stats = store.stats().await?; Ok(serde_json::json!({ "running": server_status.running, "version": server_status.version, "sync_status": sync_state.status, "last_sync": sync_state.last_sync, "messages_synced": sync_state.messages_synced, "storage_bytes": stats.total_bytes, "message_count": stats.message_count, "protocol_count": stats.protocol_count, "registered_protocols": sync_state.registered_protocols, "peer_sync_targets": sync_state.peer_sync_targets, })) } /// Trigger DWN sync with connected peers. pub(super) async fn handle_dwn_sync(&self) -> Result { let nodes = federation::load_nodes(&self.config.data_dir).await?; let onions: Vec = nodes .iter() .filter(|n| !n.onion.is_empty() && n.trust_level != federation::TrustLevel::Untrusted) .map(|n| n.onion.clone()) .collect(); let state = dwn_sync::sync_with_peers(&self.config.data_dir, &onions).await?; Ok(serde_json::json!({ "sync_status": state.status, "last_sync": state.last_sync, "messages_synced": state.messages_synced, })) } /// Register a DWN protocol. pub(super) async fn handle_dwn_register_protocol( &self, params: &serde_json::Value, ) -> Result { let protocol = params["protocol"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing 'protocol' parameter"))?; let published = params["published"].as_bool().unwrap_or(false); let definition = ProtocolDefinition { protocol: protocol.to_string(), published, types: params .get("types") .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default(), structure: params .get("structure") .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default(), date_registered: chrono::Utc::now().to_rfc3339(), }; let store = DwnStore::new(&self.config.data_dir).await?; store.register_protocol(&definition).await?; Ok(serde_json::json!({"registered": true, "protocol": protocol})) } /// List registered DWN protocols. pub(super) async fn handle_dwn_list_protocols(&self) -> Result { let store = DwnStore::new(&self.config.data_dir).await?; let protocols = store.list_protocols().await?; Ok(serde_json::json!({"protocols": protocols})) } /// Remove a DWN protocol. pub(super) async fn handle_dwn_remove_protocol( &self, params: &serde_json::Value, ) -> Result { let protocol = params["protocol"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing 'protocol' parameter"))?; let store = DwnStore::new(&self.config.data_dir).await?; let removed = store.remove_protocol(protocol).await?; Ok(serde_json::json!({"removed": removed, "protocol": protocol})) } /// Query DWN messages. pub(super) async fn handle_dwn_query_messages( &self, params: &serde_json::Value, ) -> Result { let query = MessageQuery { protocol: params["protocol"].as_str().map(|s| s.to_string()), schema: params["schema"].as_str().map(|s| s.to_string()), author: params["author"].as_str().map(|s| s.to_string()), date_from: params["dateFrom"].as_str().map(|s| s.to_string()), date_to: params["dateTo"].as_str().map(|s| s.to_string()), limit: params["limit"].as_u64().map(|n| n as usize), }; let store = DwnStore::new(&self.config.data_dir).await?; let messages = store.query_messages(&query).await?; Ok(serde_json::json!({"messages": messages, "count": messages.len()})) } /// Write a DWN message. pub(super) async fn handle_dwn_write_message( &self, params: &serde_json::Value, ) -> Result { let author = params["author"] .as_str() .ok_or_else(|| anyhow::anyhow!("Missing 'author' parameter"))?; let protocol = params["protocol"].as_str(); let schema = params["schema"].as_str(); let data_format = params["dataFormat"].as_str(); let data = params.get("data").cloned(); let store = DwnStore::new(&self.config.data_dir).await?; let message = store .write_message(author, protocol, schema, data_format, data) .await?; Ok(serde_json::json!({"written": true, "record_id": message.record_id})) } }