archy/reticulum-daemon/reticulum_daemon.py
archipelago 0eb5c258f5 fix(mesh): Meshtastic 3ccc pkc_capable pill + Sideband image interop + critical CBOR wire-bloat fix
Merges in the meshtastic agent's now-finished work alongside this session's
continuation: stock-peer (3ccc) PKI-capability is now stamped through
get_contacts -> refresh_contacts -> MeshPeer.pkc_capable, so a directed DM to/from
a PKC-capable stock Meshtastic peer correctly shows the E2E pill on the Sent row,
not just received messages. Confirmed live: .198 sees "Meshtastic 3ccc" with
pkc_capable=true.

Also fixes two real interop/correctness bugs found while live-testing the
Reticulum <-> Sideband link:
  - Receive: the daemon only ever read LXMF's plain-text content, silently
    dropping native FIELD_IMAGE/FIELD_FILE_ATTACHMENTS fields — a stock
    Sideband/NomadNet photo vanished into a blank-space message. Now decoded
    into the same ContentInline typed envelope our own attachments use.
  - Send: images to a non-archy (stock) peer now use native LXMF FIELD_IMAGE
    instead of our own opaque CBOR wire format, which Sideband can't decode.
  - Root cause of a garbled MC-chunk-fragment bug: TypedEnvelope.v/.sig (the
    OUTER wrapper every message type uses) serialized raw bytes as a CBOR
    array-of-integers instead of a native byte string, bloating every
    message on the wire ~2-3.5x — enough to push even a tiny ReadReceipt
    over the 140-byte single-frame chunking threshold. Root-caused by
    reading ciborium's deserializer source directly (deserialize_bytes only
    works within its internal scratch buffer; deserialize_byte_buf streams
    unbounded).

Frontend: consolidated the attach/record buttons into a single animated "+"
menu (was overflowing the compose row).

857/857 tests pass. Verified live across all 5 deploy-roster nodes.

Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
2026-06-30 22:07:45 -04:00

537 lines
25 KiB
Python

#!/usr/bin/env python3
"""Archipelago Reticulum daemon — host-supervised RNS + LXMF bridge.
archipelago spawns and supervises one of these per active Reticulum (RNode) radio.
It owns the serial port, runs the Reticulum stack + an LXMF router whose identity is
**derived deterministically from the Archy node key**, and exposes a tiny
line-delimited JSON-RPC over a Unix domain socket (0600) for the Rust side to drive.
Security posture (see the plan's "most secure way" section):
* Runs as the same rootless archipelago user; no root, no network control plane.
* RPC is a Unix socket only; the identity key never leaves the host and is never
logged. The daemon executes ONLY the fixed verb set below.
RPC (one JSON object per line, both directions):
in : {"cmd":"send","dest_hash":"<hex16>","content":"","title":"","method":"direct|opportunistic"}
{"cmd":"announce"}
{"cmd":"status"}
{"cmd":"send_resource","id":"<correlation>","dest_hash":"<hex16>","data_b64":""}
{"cmd":"shutdown"}
out: {"event":"ready","dest_hash":"<hex16>","display_name":""}
{"event":"recv","source_hash":"<hex16>","content":"","title":"","fields":{…},"app_data":"<hex>","rssi":n,"snr":n,"stamp":t}
{"event":"announce","dest_hash":"<hex16>","app_data":"<hex>"}
{"event":"delivered","dest_hash":"<hex16>","state":"delivered|failed","id":"<hex>"}
{"event":"status","connected":bool,"dest_hash":"<hex16>","interfaces":[…]}
{"event":"resource_progress","id":"<correlation>","transferred":n,"total":n}
{"event":"resource_sent","id":"<correlation>"}
{"event":"resource_failed","id":"<correlation>","reason":""}
{"event":"resource_recv","source_hash":"<hex16>","data_b64":""}
``send_resource`` is for large (>~2.3KB) binary payloads that don't fit the small
LXMF-message path — it uses RNS's native Resource transfer protocol over a `RNS.Link`
to the peer's *resource* destination (a separate aspect from the LXMF delivery
destination, so it doesn't disturb normal messaging). Built for sending compressed
photos/files/voice-messages directly over LoRa instead of always falling back to Tor
past the small-message size cap. ``resource_recv``'s `data_b64` is the same
CBOR-encoded payload format already used for the small-inline LoRa path, so the Rust
side decodes it identically regardless of which path delivered it.
This is the Phase-1 skeleton: the identity/LXMF wiring and RPC loop are real and
exercised by ``--check`` / ``--selftest`` (no radio). The live LoRa message path is
validated in the Phase-0 hardware gate on .116/.228.
"""
from __future__ import annotations
import argparse
import asyncio
import base64
import json
import os
import signal
import sys
from pathlib import Path
from archy_rns_identity import lxmf_destination_hash, load_identity
# Lazy heavy imports (RNS/LXMF) happen in run(); --check stays import-light.
# ─────────────────────────── RNS config generation ───────────────────────────
def _write_rns_config(configdir: Path, *, serial_port: str | None, lora: dict, no_radio: bool) -> None:
"""Materialise an RNS config file. RNode interface for real radios; a loopback-
only config for --selftest so the stack comes up without hardware or network."""
configdir.mkdir(parents=True, exist_ok=True)
cfg = configdir / "config"
if no_radio:
interfaces = (
" [[Default Interface]]\n"
" type = AutoInterface\n"
" enabled = no\n"
)
else:
interfaces = (
" [[RNode LoRa]]\n"
" type = RNodeInterface\n"
" enabled = yes\n"
f" port = {serial_port}\n"
f" frequency = {lora['frequency']}\n"
f" bandwidth = {lora['bandwidth']}\n"
f" txpower = {lora['txpower']}\n"
f" spreadingfactor = {lora['spreadingfactor']}\n"
f" codingrate = {lora['codingrate']}\n"
)
cfg.write_text(
"[reticulum]\n"
" enable_transport = no\n"
" share_instance = no\n"
" panic_on_interface_error = no\n\n"
"[interfaces]\n" + interfaces
)
os.chmod(cfg, 0o600)
# ─────────────────────────────── the daemon ──────────────────────────────────
class ReticulumDaemon:
def __init__(self, args):
self.args = args
self.seed = self._read_seed(Path(args.identity_key))
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.clients: set[asyncio.StreamWriter] = set()
self.reticulum = None
self.router = None
self.delivery_destination = None
self.identity = None
self.dest_hash_hex = lxmf_destination_hash(self.seed).hex()
# Resource transfer (large binary payloads over a dedicated Link, separate
# from LXMF delivery — see module docstring). Keyed by the peer's *resource*
# destination hash (bytes), not their LXMF delivery hash.
self.resource_destination = None
self.links: dict[bytes, "RNS.Link"] = {}
self.pending_resource_sends: dict[bytes, list[tuple[bytes, str]]] = {}
@staticmethod
def _read_seed(path: Path) -> bytes:
seed = path.read_bytes()
if len(seed) != 32:
raise SystemExit(f"identity key {path} must be 32 bytes, got {len(seed)}")
return seed
# ---- RNS / LXMF bring-up ----
def bring_up(self):
import RNS
import LXMF
configdir = Path(self.args.rns_config)
_write_rns_config(
configdir,
serial_port=self.args.serial_port,
lora={
"frequency": self.args.frequency,
"bandwidth": self.args.bandwidth,
"txpower": self.args.txpower,
"spreadingfactor": self.args.spreadingfactor,
"codingrate": self.args.codingrate,
},
no_radio=self.args.no_radio,
)
self.reticulum = RNS.Reticulum(configdir=str(configdir))
self.identity = load_identity(self.seed)
storagepath = str(configdir / "lxmf")
Path(storagepath).mkdir(parents=True, exist_ok=True)
self.router = LXMF.LXMRouter(identity=self.identity, storagepath=storagepath)
self.delivery_destination = self.router.register_delivery_identity(
self.identity, display_name=self.args.display_name
)
self.router.register_delivery_callback(self._on_lxmf_delivery)
# Hear other LXMF nodes' announces so we can surface peers + bind contacts.
RNS.Transport.register_announce_handler(_AnnounceHandler(self))
assert self.delivery_destination.hash.hex() == self.dest_hash_hex, (
"derived dest hash diverged from LXMF's — aspect/identity mismatch"
)
# Separate destination/aspect for inbound Resource transfers (large
# binary payloads), so it never collides with LXMF delivery traffic.
# Every peer running this same daemon code can derive our resource
# destination hash from our (already-announced) identity via
# RNS.Destination.hash(identity, "archy", "resource") — see
# _resource_dest_hash_for, used on the sending side.
self.resource_destination = RNS.Destination(
self.identity, RNS.Destination.IN, RNS.Destination.SINGLE,
"archy", "resource",
)
self.resource_destination.set_link_established_callback(
self._on_resource_link_established
)
def announce(self):
if self.delivery_destination is not None:
self.delivery_destination.announce(app_data=self._announce_app_data())
def _announce_app_data(self) -> bytes:
"""Carry the Archy identity so peers bind this RNS destination onto the
existing contact, the same way a meshcore/Meshtastic identity advert does.
Reuses the exact ``ARCHY:2:{ed25519_hex}:{x25519_hex}`` wire format the
Rust side already parses (``protocol::parse_identity_broadcast``) and
binds via ``handle_identity_received``/``bind_federation_twins`` — so a
Reticulum-carried identity merges into the SAME conversation as the
meshcore/Meshtastic/federation twins of the same Archy node, satisfying
cross-protocol DM convergence. The keys are the node's real Archipelago
ed25519/x25519 pubkeys (passed in by the Rust side, which already has
them) — NOT this daemon's internally-HKDF-derived RNS keys, which exist
only to make the RNS destination hash deterministic and are never
themselves treated as an Archy identity.
Falls back to a plain display-name string (undetected as an identity
blob — no `ARCHY:2:` prefix) if the Archy pubkeys weren't supplied, e.g.
a dev/selftest run with no `--archy-ed-pubkey-hex`.
"""
if self.args.archy_ed_pubkey_hex and self.args.archy_x25519_pubkey_hex:
return (
f"ARCHY:2:{self.args.archy_ed_pubkey_hex}:"
f"{self.args.archy_x25519_pubkey_hex}"
).encode("ascii")
return (self.args.display_name or "").encode("utf-8")
# ---- RNS-thread callbacks → asyncio ----
def _on_lxmf_delivery(self, message):
import LXMF
try:
app_data = b""
src = message.source_hash.hex() if message.source_hash else ""
event = {
"event": "recv",
"source_hash": src,
"content": message.content_as_string() if hasattr(message, "content_as_string")
else (message.content.decode("utf-8", "replace") if message.content else ""),
"title": message.title_as_string() if hasattr(message, "title_as_string") else "",
"app_data": app_data.hex(),
"stamp": getattr(message, "timestamp", None),
}
# Native LXMF attachment fields (Sideband/NomadNet/stock clients use
# these, NOT our own typed-envelope wire format) — a stock client's
# photo/voice-memo/file arrives here, not in `content`, which is why
# it was previously dropped silently (content was just blank/space).
# See LXMF field format confirmed against Sideband's own source
# (sbapp/sideband/core.py): FIELD_IMAGE = [format_str, bytes],
# FIELD_AUDIO = [mode_byte, bytes], FIELD_FILE_ATTACHMENTS =
# [[filename, bytes], ...].
fields = getattr(message, "fields", None) or {}
if LXMF.FIELD_IMAGE in fields:
fmt, img_bytes = fields[LXMF.FIELD_IMAGE]
event["image_format"] = str(fmt)
event["image_b64"] = base64.b64encode(bytes(img_bytes)).decode("ascii")
if LXMF.FIELD_FILE_ATTACHMENTS in fields:
attachments = fields[LXMF.FIELD_FILE_ATTACHMENTS]
if attachments:
filename, file_bytes = attachments[0]
event["attachment_filename"] = str(filename)
event["attachment_b64"] = base64.b64encode(bytes(file_bytes)).decode("ascii")
self._emit_threadsafe(event)
except Exception as e: # never let a callback kill the RNS thread
self._emit_threadsafe({"event": "error", "where": "delivery", "detail": str(e)})
def _emit_threadsafe(self, obj: dict):
self.loop.call_soon_threadsafe(self._broadcast, obj)
def _broadcast(self, obj: dict):
line = (json.dumps(obj) + "\n").encode("utf-8")
for w in list(self.clients):
try:
w.write(line)
except Exception:
self.clients.discard(w)
# ---- RPC server ----
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self.clients.add(writer)
writer.write((json.dumps({
"event": "ready", "dest_hash": self.dest_hash_hex,
"display_name": self.args.display_name,
}) + "\n").encode())
try:
async for raw in reader:
line = raw.decode("utf-8", "replace").strip()
if not line:
continue
try:
req = json.loads(line)
except json.JSONDecodeError:
continue
await self._dispatch(req, writer)
finally:
self.clients.discard(writer)
async def _dispatch(self, req: dict, writer: asyncio.StreamWriter):
cmd = req.get("cmd")
if cmd == "send":
self._send(req)
elif cmd == "announce":
self.announce()
elif cmd == "status":
self._broadcast(self._status())
elif cmd == "send_resource":
self._send_resource(req)
elif cmd == "shutdown":
self.loop.stop()
# unknown verbs are ignored by contract
def _status(self) -> dict:
ifaces = []
if self.reticulum is not None:
try:
ifaces = [str(i) for i in self.reticulum.get_interface_stats().get("interfaces", [])]
except Exception:
pass
return {"event": "status", "connected": self.router is not None,
"dest_hash": self.dest_hash_hex, "interfaces": ifaces}
def _send(self, req: dict):
import RNS
import LXMF
try:
dest_hash = bytes.fromhex(req["dest_hash"])
except (KeyError, ValueError):
return
recipient_identity = RNS.Identity.recall(dest_hash)
if recipient_identity is None:
# No path/identity yet — ask the network and drop this attempt; the Rust
# side retries once the peer is reachable (mirrors LoRa "unreachable").
RNS.Transport.request_path(dest_hash)
self._broadcast({"event": "delivered", "dest_hash": req["dest_hash"],
"state": "failed", "id": "", "reason": "no_path"})
return
dest = RNS.Destination(recipient_identity, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery")
method = {"direct": LXMF.LXMessage.DIRECT,
"opportunistic": LXMF.LXMessage.OPPORTUNISTIC,
"propagated": LXMF.LXMessage.PROPAGATED}.get(
req.get("method", "direct"), LXMF.LXMessage.DIRECT)
# Native LXMF FIELD_IMAGE — for a stock Sideband/NomadNet peer, which
# has no idea how to decode our own typed-envelope wire format. Rust
# only sets these two keys when the peer isn't an archy contact (see
# `is_archy_peer` gating in typed_messages.rs); [format, bytes] is the
# wire shape confirmed against Sideband's own source.
fields = {}
if req.get("image_b64") and req.get("image_format"):
fields[LXMF.FIELD_IMAGE] = [req["image_format"], base64.b64decode(req["image_b64"])]
msg = LXMF.LXMessage(dest, self.delivery_destination,
req.get("content", ""), req.get("title", ""),
desired_method=method, fields=fields or None)
msg.register_delivery_callback(lambda m: self._emit_threadsafe(
{"event": "delivered", "dest_hash": req["dest_hash"], "state": "delivered",
"id": m.hash.hex() if m.hash else ""}))
msg.register_failed_callback(lambda m: self._emit_threadsafe(
{"event": "delivered", "dest_hash": req["dest_hash"], "state": "failed",
"id": m.hash.hex() if m.hash else ""}))
self.router.handle_outbound(msg)
# ---- Resource transfer (large binary payloads over a dedicated Link) ----
def _resource_dest_hash_for(self, lxmf_dest_hash: bytes):
"""Derive a peer's *resource* destination hash from their LXMF delivery
hash. Requires having already recalled their Identity (e.g. via a prior
`_send`/announce) — returns None if we haven't heard from them yet."""
import RNS
identity = RNS.Identity.recall(lxmf_dest_hash)
if identity is None:
return None
return RNS.Destination.hash(identity, "archy", "resource")
def _get_or_create_link(self, lxmf_dest_hash: bytes):
import RNS
identity = RNS.Identity.recall(lxmf_dest_hash)
if identity is None:
RNS.Transport.request_path(lxmf_dest_hash)
return None, None
resource_hash = RNS.Destination.hash(identity, "archy", "resource")
link = self.links.get(resource_hash)
if link is not None and link.status != RNS.Link.CLOSED:
return link, resource_hash
out_dest = RNS.Destination(
identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"archy", "resource",
)
link = RNS.Link(out_dest)
link.set_link_closed_callback(
lambda lk: self.links.pop(resource_hash, None)
)
self.links[resource_hash] = link
return link, resource_hash
def _send_resource(self, req: dict):
import RNS
req_id = req.get("id", "")
try:
lxmf_dest_hash = bytes.fromhex(req["dest_hash"])
data = base64.b64decode(req["data_b64"])
except (KeyError, ValueError, TypeError):
self._emit_threadsafe({"event": "resource_failed", "id": req_id,
"reason": "bad_request"})
return
link, resource_hash = self._get_or_create_link(lxmf_dest_hash)
if link is None:
self._emit_threadsafe({"event": "resource_failed", "id": req_id,
"reason": "no_path"})
return
self.pending_resource_sends.setdefault(resource_hash, [])
if link.status == RNS.Link.ACTIVE:
self._start_resource(link, data, req_id)
else:
# Link is establishing — queue and flush from the established
# callback. set_link_established_callback only fires once per Link
# object (the cache is reused across sends), so re-set it here to
# make sure THIS send's queue entry gets flushed too.
self.pending_resource_sends[resource_hash].append((data, req_id))
link.set_link_established_callback(
lambda lk: self._flush_pending_resource_sends(resource_hash, lk)
)
def _flush_pending_resource_sends(self, resource_hash: bytes, link):
import RNS
pending = self.pending_resource_sends.pop(resource_hash, [])
for data, req_id in pending:
if link.status == RNS.Link.ACTIVE:
self._start_resource(link, data, req_id)
else:
self._emit_threadsafe({"event": "resource_failed", "id": req_id,
"reason": "link_failed"})
def _start_resource(self, link, data: bytes, req_id: str):
import RNS
def on_progress(resource):
total = resource.get_data_size() if hasattr(resource, "get_data_size") else len(data)
transferred = int(total * resource.get_progress()) if hasattr(resource, "get_progress") else 0
self._emit_threadsafe({"event": "resource_progress", "id": req_id,
"transferred": transferred, "total": total})
def on_concluded(resource):
if resource.status == RNS.Resource.COMPLETE:
self._emit_threadsafe({"event": "resource_sent", "id": req_id})
else:
self._emit_threadsafe({"event": "resource_failed", "id": req_id,
"reason": "transfer_failed"})
RNS.Resource(data, link, callback=on_concluded, progress_callback=on_progress)
def _on_resource_link_established(self, link):
import RNS
link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
link.set_resource_concluded_callback(self._on_resource_received)
def _on_resource_received(self, resource):
import RNS
try:
if resource.status != RNS.Resource.COMPLETE:
return
identity = resource.link.get_remote_identity()
# Report the peer's LXMF *delivery* hash (not the raw identity hash,
# and not the resource-aspect hash) since that's what the Rust side's
# contact table is keyed on for every other inbound event.
source_hash = (
RNS.Destination.hash(identity, "lxmf", "delivery").hex()
if identity is not None else ""
)
self._emit_threadsafe({
"event": "resource_recv",
"source_hash": source_hash,
"data_b64": base64.b64encode(resource.data).decode("ascii"),
})
except Exception as e: # never let a callback kill the RNS thread
self._emit_threadsafe({"event": "error", "where": "resource_recv",
"detail": str(e)})
async def serve(self):
sock_path = self.args.socket
if os.path.exists(sock_path):
os.unlink(sock_path)
server = await asyncio.start_unix_server(self._handle_client, path=sock_path)
os.chmod(sock_path, 0o600)
self.announce()
async with server:
await server.serve_forever()
class _AnnounceHandler:
"""Surfaces every heard LXMF delivery announce to the Rust side."""
aspect_filter = "lxmf.delivery"
def __init__(self, daemon: "ReticulumDaemon"):
self.daemon = daemon
self.receive_path_responses = True
def received_announce(self, destination_hash, announced_identity, app_data):
self.daemon._emit_threadsafe({
"event": "announce",
"dest_hash": destination_hash.hex(),
"app_data": (app_data or b"").hex(),
})
# ─────────────────────────────── entrypoints ─────────────────────────────────
def _parse_args(argv):
p = argparse.ArgumentParser(description="Archipelago Reticulum daemon")
p.add_argument("--identity-key", required=True, help="path to Archy node_key (32-byte ed25519 seed)")
p.add_argument("--socket", default="/tmp/archy-reticulum.sock", help="Unix RPC socket path")
p.add_argument("--rns-config", default=str(Path.home() / ".archy-reticulum"), help="RNS config/storage dir")
p.add_argument("--serial-port", help="RNode serial device, e.g. /dev/reticulum-radio")
p.add_argument("--display-name", default="Archy", help="LXMF display name")
p.add_argument("--archy-ed-pubkey-hex", default=None,
help="Archy ed25519 pubkey hex (64 chars) — embedded in the announce "
"app_data as ARCHY:2:... so peers bind this RNS destination onto "
"the existing Archy contact. Omit for a plain display-name announce.")
p.add_argument("--archy-x25519-pubkey-hex", default=None,
help="Archy x25519 pubkey hex (64 chars), paired with --archy-ed-pubkey-hex.")
# LoRa profile (defaults are EU_868-ish; settled by the Phase-0 spike)
p.add_argument("--frequency", type=int, default=869525000)
p.add_argument("--bandwidth", type=int, default=125000)
p.add_argument("--txpower", type=int, default=17)
p.add_argument("--spreadingfactor", type=int, default=8)
p.add_argument("--codingrate", type=int, default=5)
p.add_argument("--no-radio", action="store_true", help="bring up with no RNode (selftest)")
p.add_argument("--check", action="store_true", help="print derived dest hash and exit (no RNS)")
p.add_argument("--selftest", action="store_true", help="bring up RNS+LXMF with no radio, verify, exit")
return p.parse_args(argv)
def main(argv=None) -> int:
args = _parse_args(argv if argv is not None else sys.argv[1:])
if args.check:
seed = ReticulumDaemon._read_seed(Path(args.identity_key))
print(lxmf_destination_hash(seed).hex())
return 0
daemon = ReticulumDaemon(args)
if args.selftest:
args.no_radio = True
daemon.bring_up()
print(f"selftest ok — dest_hash={daemon.dest_hash_hex} "
f"display_name={args.display_name!r} lxmf_router=up")
return 0
for sig in (signal.SIGINT, signal.SIGTERM):
daemon.loop.add_signal_handler(sig, daemon.loop.stop)
try:
daemon.bring_up()
daemon.loop.run_until_complete(daemon.serve())
except RuntimeError:
pass # loop.stop() during serve_forever
finally:
if os.path.exists(args.socket):
os.unlink(args.socket)
return 0
if __name__ == "__main__":
sys.exit(main())