archy/reticulum-daemon/reticulum_daemon.py
archipelago f54c853128 feat(mesh): Reticulum LoRa hardware gates pass + RNS Resource transfer + image/voice attachments
Phase 0 gates #2/#3 (two-node LXMF-over-LoRa, external Sideband interop) passed
on real hardware (.116's flashed Heltec V3 RNode <-> a phone-flashed RNode running
Sideband) — RNS announce, encrypted DM round-trip, and contact binding all verified
live. Fixed two bugs found in the process: the Reticulum send path wasn't stamping
outbound messages as E2E despite LXMF being unconditionally encrypted, and the
per-message transport pill collapsed Meshcore/Meshtastic into one generic "lora"
color instead of distinguishing the three radio transports.

Built on top of that link: a Columba-style image/file send experience —
compression-quality presets with a real transfer-time estimate (mesh.transport-advice,
now device-throughput-aware), receive-side thumbnail previews + auto-render for
already-local attachments, and async voice messages, all reusing the existing
ContentRef/ContentInline attachment pipeline. The headline addition is genuine RNS
Resource transfer support (daemon-side RNS.Link + RNS.Resource, Rust-side
send_resource/resource_recv plumbing, a new "resource-mesh" transport-advice tier)
so compressed photos up to 2MB now actually transfer over LoRa for Reticulum peers
instead of always falling back to Tor past the small inline-chunk cap.

Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
2026-06-30 19:57:01 -04:00

508 lines
23 KiB
Python

#!/usr/bin/env python3
"""Archipelago Reticulum daemon — host-supervised RNS + LXMF bridge.
archipelago spawns and supervises one of these per active Reticulum (RNode) radio.
It owns the serial port, runs the Reticulum stack + an LXMF router whose identity is
**derived deterministically from the Archy node key**, and exposes a tiny
line-delimited JSON-RPC over a Unix domain socket (0600) for the Rust side to drive.
Security posture (see the plan's "most secure way" section):
* Runs as the same rootless archipelago user; no root, no network control plane.
* RPC is a Unix socket only; the identity key never leaves the host and is never
logged. The daemon executes ONLY the fixed verb set below.
RPC (one JSON object per line, both directions):
in : {"cmd":"send","dest_hash":"<hex16>","content":"","title":"","method":"direct|opportunistic"}
{"cmd":"announce"}
{"cmd":"status"}
{"cmd":"send_resource","id":"<correlation>","dest_hash":"<hex16>","data_b64":""}
{"cmd":"shutdown"}
out: {"event":"ready","dest_hash":"<hex16>","display_name":""}
{"event":"recv","source_hash":"<hex16>","content":"","title":"","fields":{…},"app_data":"<hex>","rssi":n,"snr":n,"stamp":t}
{"event":"announce","dest_hash":"<hex16>","app_data":"<hex>"}
{"event":"delivered","dest_hash":"<hex16>","state":"delivered|failed","id":"<hex>"}
{"event":"status","connected":bool,"dest_hash":"<hex16>","interfaces":[…]}
{"event":"resource_progress","id":"<correlation>","transferred":n,"total":n}
{"event":"resource_sent","id":"<correlation>"}
{"event":"resource_failed","id":"<correlation>","reason":""}
{"event":"resource_recv","source_hash":"<hex16>","data_b64":""}
``send_resource`` is for large (>~2.3KB) binary payloads that don't fit the small
LXMF-message path — it uses RNS's native Resource transfer protocol over a `RNS.Link`
to the peer's *resource* destination (a separate aspect from the LXMF delivery
destination, so it doesn't disturb normal messaging). Built for sending compressed
photos/files/voice-messages directly over LoRa instead of always falling back to Tor
past the small-message size cap. ``resource_recv``'s `data_b64` is the same
CBOR-encoded payload format already used for the small-inline LoRa path, so the Rust
side decodes it identically regardless of which path delivered it.
This is the Phase-1 skeleton: the identity/LXMF wiring and RPC loop are real and
exercised by ``--check`` / ``--selftest`` (no radio). The live LoRa message path is
validated in the Phase-0 hardware gate on .116/.228.
"""
from __future__ import annotations
import argparse
import asyncio
import base64
import json
import os
import signal
import sys
from pathlib import Path
from archy_rns_identity import lxmf_destination_hash, load_identity
# Lazy heavy imports (RNS/LXMF) happen in run(); --check stays import-light.
# ─────────────────────────── RNS config generation ───────────────────────────
def _write_rns_config(configdir: Path, *, serial_port: str | None, lora: dict, no_radio: bool) -> None:
"""Materialise an RNS config file. RNode interface for real radios; a loopback-
only config for --selftest so the stack comes up without hardware or network."""
configdir.mkdir(parents=True, exist_ok=True)
cfg = configdir / "config"
if no_radio:
interfaces = (
" [[Default Interface]]\n"
" type = AutoInterface\n"
" enabled = no\n"
)
else:
interfaces = (
" [[RNode LoRa]]\n"
" type = RNodeInterface\n"
" enabled = yes\n"
f" port = {serial_port}\n"
f" frequency = {lora['frequency']}\n"
f" bandwidth = {lora['bandwidth']}\n"
f" txpower = {lora['txpower']}\n"
f" spreadingfactor = {lora['spreadingfactor']}\n"
f" codingrate = {lora['codingrate']}\n"
)
cfg.write_text(
"[reticulum]\n"
" enable_transport = no\n"
" share_instance = no\n"
" panic_on_interface_error = no\n\n"
"[interfaces]\n" + interfaces
)
os.chmod(cfg, 0o600)
# ─────────────────────────────── the daemon ──────────────────────────────────
class ReticulumDaemon:
def __init__(self, args):
self.args = args
self.seed = self._read_seed(Path(args.identity_key))
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.clients: set[asyncio.StreamWriter] = set()
self.reticulum = None
self.router = None
self.delivery_destination = None
self.identity = None
self.dest_hash_hex = lxmf_destination_hash(self.seed).hex()
# Resource transfer (large binary payloads over a dedicated Link, separate
# from LXMF delivery — see module docstring). Keyed by the peer's *resource*
# destination hash (bytes), not their LXMF delivery hash.
self.resource_destination = None
self.links: dict[bytes, "RNS.Link"] = {}
self.pending_resource_sends: dict[bytes, list[tuple[bytes, str]]] = {}
@staticmethod
def _read_seed(path: Path) -> bytes:
seed = path.read_bytes()
if len(seed) != 32:
raise SystemExit(f"identity key {path} must be 32 bytes, got {len(seed)}")
return seed
# ---- RNS / LXMF bring-up ----
def bring_up(self):
import RNS
import LXMF
configdir = Path(self.args.rns_config)
_write_rns_config(
configdir,
serial_port=self.args.serial_port,
lora={
"frequency": self.args.frequency,
"bandwidth": self.args.bandwidth,
"txpower": self.args.txpower,
"spreadingfactor": self.args.spreadingfactor,
"codingrate": self.args.codingrate,
},
no_radio=self.args.no_radio,
)
self.reticulum = RNS.Reticulum(configdir=str(configdir))
self.identity = load_identity(self.seed)
storagepath = str(configdir / "lxmf")
Path(storagepath).mkdir(parents=True, exist_ok=True)
self.router = LXMF.LXMRouter(identity=self.identity, storagepath=storagepath)
self.delivery_destination = self.router.register_delivery_identity(
self.identity, display_name=self.args.display_name
)
self.router.register_delivery_callback(self._on_lxmf_delivery)
# Hear other LXMF nodes' announces so we can surface peers + bind contacts.
RNS.Transport.register_announce_handler(_AnnounceHandler(self))
assert self.delivery_destination.hash.hex() == self.dest_hash_hex, (
"derived dest hash diverged from LXMF's — aspect/identity mismatch"
)
# Separate destination/aspect for inbound Resource transfers (large
# binary payloads), so it never collides with LXMF delivery traffic.
# Every peer running this same daemon code can derive our resource
# destination hash from our (already-announced) identity via
# RNS.Destination.hash(identity, "archy", "resource") — see
# _resource_dest_hash_for, used on the sending side.
self.resource_destination = RNS.Destination(
self.identity, RNS.Destination.IN, RNS.Destination.SINGLE,
"archy", "resource",
)
self.resource_destination.set_link_established_callback(
self._on_resource_link_established
)
def announce(self):
if self.delivery_destination is not None:
self.delivery_destination.announce(app_data=self._announce_app_data())
def _announce_app_data(self) -> bytes:
"""Carry the Archy identity so peers bind this RNS destination onto the
existing contact, the same way a meshcore/Meshtastic identity advert does.
Reuses the exact ``ARCHY:2:{ed25519_hex}:{x25519_hex}`` wire format the
Rust side already parses (``protocol::parse_identity_broadcast``) and
binds via ``handle_identity_received``/``bind_federation_twins`` — so a
Reticulum-carried identity merges into the SAME conversation as the
meshcore/Meshtastic/federation twins of the same Archy node, satisfying
cross-protocol DM convergence. The keys are the node's real Archipelago
ed25519/x25519 pubkeys (passed in by the Rust side, which already has
them) — NOT this daemon's internally-HKDF-derived RNS keys, which exist
only to make the RNS destination hash deterministic and are never
themselves treated as an Archy identity.
Falls back to a plain display-name string (undetected as an identity
blob — no `ARCHY:2:` prefix) if the Archy pubkeys weren't supplied, e.g.
a dev/selftest run with no `--archy-ed-pubkey-hex`.
"""
if self.args.archy_ed_pubkey_hex and self.args.archy_x25519_pubkey_hex:
return (
f"ARCHY:2:{self.args.archy_ed_pubkey_hex}:"
f"{self.args.archy_x25519_pubkey_hex}"
).encode("ascii")
return (self.args.display_name or "").encode("utf-8")
# ---- RNS-thread callbacks → asyncio ----
def _on_lxmf_delivery(self, message):
try:
app_data = b""
src = message.source_hash.hex() if message.source_hash else ""
self._emit_threadsafe({
"event": "recv",
"source_hash": src,
"content": message.content_as_string() if hasattr(message, "content_as_string")
else (message.content.decode("utf-8", "replace") if message.content else ""),
"title": message.title_as_string() if hasattr(message, "title_as_string") else "",
"app_data": app_data.hex(),
"stamp": getattr(message, "timestamp", None),
})
except Exception as e: # never let a callback kill the RNS thread
self._emit_threadsafe({"event": "error", "where": "delivery", "detail": str(e)})
def _emit_threadsafe(self, obj: dict):
self.loop.call_soon_threadsafe(self._broadcast, obj)
def _broadcast(self, obj: dict):
line = (json.dumps(obj) + "\n").encode("utf-8")
for w in list(self.clients):
try:
w.write(line)
except Exception:
self.clients.discard(w)
# ---- RPC server ----
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self.clients.add(writer)
writer.write((json.dumps({
"event": "ready", "dest_hash": self.dest_hash_hex,
"display_name": self.args.display_name,
}) + "\n").encode())
try:
async for raw in reader:
line = raw.decode("utf-8", "replace").strip()
if not line:
continue
try:
req = json.loads(line)
except json.JSONDecodeError:
continue
await self._dispatch(req, writer)
finally:
self.clients.discard(writer)
async def _dispatch(self, req: dict, writer: asyncio.StreamWriter):
cmd = req.get("cmd")
if cmd == "send":
self._send(req)
elif cmd == "announce":
self.announce()
elif cmd == "status":
self._broadcast(self._status())
elif cmd == "send_resource":
self._send_resource(req)
elif cmd == "shutdown":
self.loop.stop()
# unknown verbs are ignored by contract
def _status(self) -> dict:
ifaces = []
if self.reticulum is not None:
try:
ifaces = [str(i) for i in self.reticulum.get_interface_stats().get("interfaces", [])]
except Exception:
pass
return {"event": "status", "connected": self.router is not None,
"dest_hash": self.dest_hash_hex, "interfaces": ifaces}
def _send(self, req: dict):
import RNS
import LXMF
try:
dest_hash = bytes.fromhex(req["dest_hash"])
except (KeyError, ValueError):
return
recipient_identity = RNS.Identity.recall(dest_hash)
if recipient_identity is None:
# No path/identity yet — ask the network and drop this attempt; the Rust
# side retries once the peer is reachable (mirrors LoRa "unreachable").
RNS.Transport.request_path(dest_hash)
self._broadcast({"event": "delivered", "dest_hash": req["dest_hash"],
"state": "failed", "id": "", "reason": "no_path"})
return
dest = RNS.Destination(recipient_identity, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery")
method = {"direct": LXMF.LXMessage.DIRECT,
"opportunistic": LXMF.LXMessage.OPPORTUNISTIC,
"propagated": LXMF.LXMessage.PROPAGATED}.get(
req.get("method", "direct"), LXMF.LXMessage.DIRECT)
msg = LXMF.LXMessage(dest, self.delivery_destination,
req.get("content", ""), req.get("title", ""),
desired_method=method)
msg.register_delivery_callback(lambda m: self._emit_threadsafe(
{"event": "delivered", "dest_hash": req["dest_hash"], "state": "delivered",
"id": m.hash.hex() if m.hash else ""}))
msg.register_failed_callback(lambda m: self._emit_threadsafe(
{"event": "delivered", "dest_hash": req["dest_hash"], "state": "failed",
"id": m.hash.hex() if m.hash else ""}))
self.router.handle_outbound(msg)
# ---- Resource transfer (large binary payloads over a dedicated Link) ----
def _resource_dest_hash_for(self, lxmf_dest_hash: bytes):
"""Derive a peer's *resource* destination hash from their LXMF delivery
hash. Requires having already recalled their Identity (e.g. via a prior
`_send`/announce) — returns None if we haven't heard from them yet."""
import RNS
identity = RNS.Identity.recall(lxmf_dest_hash)
if identity is None:
return None
return RNS.Destination.hash(identity, "archy", "resource")
def _get_or_create_link(self, lxmf_dest_hash: bytes):
import RNS
identity = RNS.Identity.recall(lxmf_dest_hash)
if identity is None:
RNS.Transport.request_path(lxmf_dest_hash)
return None, None
resource_hash = RNS.Destination.hash(identity, "archy", "resource")
link = self.links.get(resource_hash)
if link is not None and link.status != RNS.Link.CLOSED:
return link, resource_hash
out_dest = RNS.Destination(
identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"archy", "resource",
)
link = RNS.Link(out_dest)
link.set_link_closed_callback(
lambda lk: self.links.pop(resource_hash, None)
)
self.links[resource_hash] = link
return link, resource_hash
def _send_resource(self, req: dict):
import RNS
req_id = req.get("id", "")
try:
lxmf_dest_hash = bytes.fromhex(req["dest_hash"])
data = base64.b64decode(req["data_b64"])
except (KeyError, ValueError, TypeError):
self._emit_threadsafe({"event": "resource_failed", "id": req_id,
"reason": "bad_request"})
return
link, resource_hash = self._get_or_create_link(lxmf_dest_hash)
if link is None:
self._emit_threadsafe({"event": "resource_failed", "id": req_id,
"reason": "no_path"})
return
self.pending_resource_sends.setdefault(resource_hash, [])
if link.status == RNS.Link.ACTIVE:
self._start_resource(link, data, req_id)
else:
# Link is establishing — queue and flush from the established
# callback. set_link_established_callback only fires once per Link
# object (the cache is reused across sends), so re-set it here to
# make sure THIS send's queue entry gets flushed too.
self.pending_resource_sends[resource_hash].append((data, req_id))
link.set_link_established_callback(
lambda lk: self._flush_pending_resource_sends(resource_hash, lk)
)
def _flush_pending_resource_sends(self, resource_hash: bytes, link):
import RNS
pending = self.pending_resource_sends.pop(resource_hash, [])
for data, req_id in pending:
if link.status == RNS.Link.ACTIVE:
self._start_resource(link, data, req_id)
else:
self._emit_threadsafe({"event": "resource_failed", "id": req_id,
"reason": "link_failed"})
def _start_resource(self, link, data: bytes, req_id: str):
import RNS
def on_progress(resource):
total = resource.get_data_size() if hasattr(resource, "get_data_size") else len(data)
transferred = int(total * resource.get_progress()) if hasattr(resource, "get_progress") else 0
self._emit_threadsafe({"event": "resource_progress", "id": req_id,
"transferred": transferred, "total": total})
def on_concluded(resource):
if resource.status == RNS.Resource.COMPLETE:
self._emit_threadsafe({"event": "resource_sent", "id": req_id})
else:
self._emit_threadsafe({"event": "resource_failed", "id": req_id,
"reason": "transfer_failed"})
RNS.Resource(data, link, callback=on_concluded, progress_callback=on_progress)
def _on_resource_link_established(self, link):
import RNS
link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
link.set_resource_concluded_callback(self._on_resource_received)
def _on_resource_received(self, resource):
import RNS
try:
if resource.status != RNS.Resource.COMPLETE:
return
identity = resource.link.get_remote_identity()
# Report the peer's LXMF *delivery* hash (not the raw identity hash,
# and not the resource-aspect hash) since that's what the Rust side's
# contact table is keyed on for every other inbound event.
source_hash = (
RNS.Destination.hash(identity, "lxmf", "delivery").hex()
if identity is not None else ""
)
self._emit_threadsafe({
"event": "resource_recv",
"source_hash": source_hash,
"data_b64": base64.b64encode(resource.data).decode("ascii"),
})
except Exception as e: # never let a callback kill the RNS thread
self._emit_threadsafe({"event": "error", "where": "resource_recv",
"detail": str(e)})
async def serve(self):
sock_path = self.args.socket
if os.path.exists(sock_path):
os.unlink(sock_path)
server = await asyncio.start_unix_server(self._handle_client, path=sock_path)
os.chmod(sock_path, 0o600)
self.announce()
async with server:
await server.serve_forever()
class _AnnounceHandler:
"""Surfaces every heard LXMF delivery announce to the Rust side."""
aspect_filter = "lxmf.delivery"
def __init__(self, daemon: "ReticulumDaemon"):
self.daemon = daemon
self.receive_path_responses = True
def received_announce(self, destination_hash, announced_identity, app_data):
self.daemon._emit_threadsafe({
"event": "announce",
"dest_hash": destination_hash.hex(),
"app_data": (app_data or b"").hex(),
})
# ─────────────────────────────── entrypoints ─────────────────────────────────
def _parse_args(argv):
p = argparse.ArgumentParser(description="Archipelago Reticulum daemon")
p.add_argument("--identity-key", required=True, help="path to Archy node_key (32-byte ed25519 seed)")
p.add_argument("--socket", default="/tmp/archy-reticulum.sock", help="Unix RPC socket path")
p.add_argument("--rns-config", default=str(Path.home() / ".archy-reticulum"), help="RNS config/storage dir")
p.add_argument("--serial-port", help="RNode serial device, e.g. /dev/reticulum-radio")
p.add_argument("--display-name", default="Archy", help="LXMF display name")
p.add_argument("--archy-ed-pubkey-hex", default=None,
help="Archy ed25519 pubkey hex (64 chars) — embedded in the announce "
"app_data as ARCHY:2:... so peers bind this RNS destination onto "
"the existing Archy contact. Omit for a plain display-name announce.")
p.add_argument("--archy-x25519-pubkey-hex", default=None,
help="Archy x25519 pubkey hex (64 chars), paired with --archy-ed-pubkey-hex.")
# LoRa profile (defaults are EU_868-ish; settled by the Phase-0 spike)
p.add_argument("--frequency", type=int, default=869525000)
p.add_argument("--bandwidth", type=int, default=125000)
p.add_argument("--txpower", type=int, default=17)
p.add_argument("--spreadingfactor", type=int, default=8)
p.add_argument("--codingrate", type=int, default=5)
p.add_argument("--no-radio", action="store_true", help="bring up with no RNode (selftest)")
p.add_argument("--check", action="store_true", help="print derived dest hash and exit (no RNS)")
p.add_argument("--selftest", action="store_true", help="bring up RNS+LXMF with no radio, verify, exit")
return p.parse_args(argv)
def main(argv=None) -> int:
args = _parse_args(argv if argv is not None else sys.argv[1:])
if args.check:
seed = ReticulumDaemon._read_seed(Path(args.identity_key))
print(lxmf_destination_hash(seed).hex())
return 0
daemon = ReticulumDaemon(args)
if args.selftest:
args.no_radio = True
daemon.bring_up()
print(f"selftest ok — dest_hash={daemon.dest_hash_hex} "
f"display_name={args.display_name!r} lxmf_router=up")
return 0
for sig in (signal.SIGINT, signal.SIGTERM):
daemon.loop.add_signal_handler(sig, daemon.loop.stop)
try:
daemon.bring_up()
daemon.loop.run_until_complete(daemon.serve())
except RuntimeError:
pass # loop.stop() during serve_forever
finally:
if os.path.exists(args.socket):
os.unlink(args.socket)
return 0
if __name__ == "__main__":
sys.exit(main())