- Add GETTING_STARTED.md with quick start guide and development modes - Add INSTALL.sh automated installation script - Add INSTALLATION_CHECKLIST.md, INSTALLATION_SUCCESS.md, and INSTALLATION_SUMMARY.md - Add QUICK_REFERENCE.md for common commands - Add SETUP_GUIDE.md with detailed setup instructions - Update README.md with improved project overview - Add did-wallet app dependencies and node_modules
466 lines
22 KiB
JavaScript
466 lines
22 KiB
JavaScript
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
|
|
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
|
|
return new (P || (P = Promise))(function (resolve, reject) {
|
|
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
|
|
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
|
|
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
|
|
step((generator = generator.apply(thisArg, _arguments || [])).next());
|
|
});
|
|
};
|
|
import ms from 'ms';
|
|
import { Level } from 'level';
|
|
import { monotonicFactory } from 'ulidx';
|
|
import { Convert, NodeStream } from '@web5/common';
|
|
import { DataStream } from '@tbd54566975/dwn-sdk-js';
|
|
import { DwnInterface } from './types/dwn.js';
|
|
import { getDwnServiceEndpointUrls, isRecordsWrite } from './utils.js';
|
|
const is2xx = (code) => code >= 200 && code <= 299;
|
|
const is4xx = (code) => code >= 400 && code <= 499;
|
|
export class SyncEngineLevel {
|
|
constructor({ agent, dataPath, db }) {
|
|
this._agent = agent;
|
|
this._db = (db) ? db : new Level(dataPath !== null && dataPath !== void 0 ? dataPath : 'DATA/AGENT/SYNC_STORE');
|
|
this._ulidFactory = monotonicFactory();
|
|
}
|
|
/**
|
|
* Retrieves the `Web5PlatformAgent` execution context.
|
|
*
|
|
* @returns The `Web5PlatformAgent` instance that represents the current execution context.
|
|
* @throws Will throw an error if the `agent` instance property is undefined.
|
|
*/
|
|
get agent() {
|
|
if (this._agent === undefined) {
|
|
throw new Error('SyncEngineLevel: Unable to determine agent execution context.');
|
|
}
|
|
return this._agent;
|
|
}
|
|
set agent(agent) {
|
|
this._agent = agent;
|
|
}
|
|
clear() {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
yield this._db.clear();
|
|
});
|
|
}
|
|
close() {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
yield this._db.close();
|
|
});
|
|
}
|
|
pull() {
|
|
var _a;
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
const syncPeerState = yield this.getSyncPeerState({ syncDirection: 'pull' });
|
|
yield this.enqueueOperations({ syncDirection: 'pull', syncPeerState });
|
|
const pullQueue = this.getPullQueue();
|
|
const pullJobs = yield pullQueue.iterator().all();
|
|
const deleteOperations = [];
|
|
const errored = new Set();
|
|
for (let job of pullJobs) {
|
|
const [key] = job;
|
|
const [did, dwnUrl, _, messageCid] = key.split('~');
|
|
// If a particular DWN service endpoint is unreachable, skip subsequent pull operations.
|
|
if (errored.has(dwnUrl)) {
|
|
continue;
|
|
}
|
|
const messageExists = yield this.messageExists(did, messageCid);
|
|
if (messageExists) {
|
|
deleteOperations.push({ type: 'del', key: key });
|
|
continue;
|
|
}
|
|
const messagesGet = yield this.agent.dwn.createMessage({
|
|
author: did,
|
|
messageType: DwnInterface.MessagesGet,
|
|
messageParams: {
|
|
messageCids: [messageCid]
|
|
}
|
|
});
|
|
let reply;
|
|
try {
|
|
reply = (yield this.agent.rpc.sendDwnRequest({
|
|
dwnUrl,
|
|
targetDid: did,
|
|
message: messagesGet
|
|
}));
|
|
}
|
|
catch (e) {
|
|
errored.add(dwnUrl);
|
|
continue;
|
|
}
|
|
// TODO: Refactor this to batch network requests for record messages rather than one at a time.
|
|
// Per Moe, this loop exists because the original intent was to pass multiple messageCid
|
|
// values to batch network requests for record messages rather than one at a time, as it
|
|
// is currently implemented. Either the pull() method should be refactored to batch
|
|
// getting messages OR this loop should be removed.
|
|
for (let entry of (_a = reply.entries) !== null && _a !== void 0 ? _a : []) {
|
|
if (entry.error || !entry.message) {
|
|
yield this.addMessage(did, messageCid);
|
|
deleteOperations.push({ type: 'del', key: key });
|
|
continue;
|
|
}
|
|
let dataStream;
|
|
if (isRecordsWrite(entry)) {
|
|
const { encodedData } = entry;
|
|
const message = entry.message;
|
|
if (encodedData) {
|
|
const dataBytes = Convert.base64Url(encodedData).toUint8Array();
|
|
dataStream = DataStream.fromBytes(dataBytes);
|
|
}
|
|
else {
|
|
const recordsRead = yield this.agent.dwn.createMessage({
|
|
author: did,
|
|
messageType: DwnInterface.RecordsRead,
|
|
messageParams: {
|
|
filter: {
|
|
recordId: message.recordId
|
|
}
|
|
}
|
|
});
|
|
const recordsReadReply = yield this.agent.rpc.sendDwnRequest({
|
|
dwnUrl,
|
|
targetDid: did,
|
|
message: recordsRead.message
|
|
});
|
|
const { record, status: readStatus } = recordsReadReply;
|
|
if (is2xx(readStatus.code) && record) {
|
|
// If the read was successful, convert the data stream from web ReadableStream
|
|
// to Node.js Readable so that the DWN can process it.
|
|
// TODO: Remove the type assertion once sendDwnRequest type is fixed to return a ReadableStream.
|
|
dataStream = NodeStream.fromWebReadable({ readableStream: record.data });
|
|
}
|
|
else if (readStatus.code >= 400) {
|
|
// writes record without data, if this is an initial records write, it will succeed.
|
|
const pruneReply = yield this.agent.dwn.processMessage({
|
|
targetDid: did,
|
|
message
|
|
});
|
|
if (pruneReply.status.code === 202 || pruneReply.status.code === 409) {
|
|
yield this.addMessage(did, messageCid);
|
|
deleteOperations.push({ type: 'del', key: key });
|
|
continue;
|
|
}
|
|
else {
|
|
throw new Error(`SyncManager: Failed to sync tombstone for message '${messageCid}'`);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
const pullReply = yield this.agent.dwn.processMessage({
|
|
targetDid: did,
|
|
message: entry.message,
|
|
dataStream
|
|
});
|
|
if (pullReply.status.code === 202 || pullReply.status.code === 409) {
|
|
yield this.addMessage(did, messageCid);
|
|
deleteOperations.push({ type: 'del', key: key });
|
|
}
|
|
}
|
|
}
|
|
yield pullQueue.batch(deleteOperations);
|
|
});
|
|
}
|
|
push() {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
const syncPeerState = yield this.getSyncPeerState({ syncDirection: 'push' });
|
|
yield this.enqueueOperations({ syncDirection: 'push', syncPeerState });
|
|
const pushQueue = this.getPushQueue();
|
|
const pushJobs = yield pushQueue.iterator().all();
|
|
const deleteOperations = [];
|
|
const errored = new Set();
|
|
for (let job of pushJobs) {
|
|
const [key] = job;
|
|
const [did, dwnUrl, _, messageCid] = key.split('~');
|
|
// If a particular DWN service endpoint is unreachable, skip subsequent push operations.
|
|
if (errored.has(dwnUrl)) {
|
|
continue;
|
|
}
|
|
// Attempt to retrieve the message from the local DWN.
|
|
const dwnMessage = yield this.getDwnMessage({ author: did, messageCid });
|
|
// If the message does not exist on the local DWN, remove the sync operation from the
|
|
// push queue, update the push watermark for this DID/DWN endpoint combination, add the
|
|
// message to the local message store, and continue to the next job.
|
|
if (!dwnMessage) {
|
|
deleteOperations.push({ type: 'del', key: key });
|
|
yield this.addMessage(did, messageCid);
|
|
continue;
|
|
}
|
|
try {
|
|
const reply = yield this.agent.rpc.sendDwnRequest({
|
|
dwnUrl,
|
|
targetDid: did,
|
|
data: dwnMessage.data,
|
|
message: dwnMessage.message
|
|
});
|
|
// Update the watermark and add the messageCid to the Sync Message Store if either:
|
|
// - 202: message was successfully written to the remote DWN
|
|
// - 409: message was already present on the remote DWN
|
|
if (reply.status.code === 202 || reply.status.code === 409) {
|
|
yield this.addMessage(did, messageCid);
|
|
deleteOperations.push({ type: 'del', key: key });
|
|
}
|
|
}
|
|
catch (_a) {
|
|
// Error is intentionally ignored; 'errored' set is updated with 'dwnUrl'.
|
|
errored.add(dwnUrl);
|
|
}
|
|
}
|
|
yield pushQueue.batch(deleteOperations);
|
|
});
|
|
}
|
|
registerIdentity({ did }) {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
// Get a reference to the `registeredIdentities` sublevel.
|
|
const registeredIdentities = this._db.sublevel('registeredIdentities');
|
|
// Add (or overwrite, if present) the Identity's DID as a registered identity.
|
|
yield registeredIdentities.put(did, '');
|
|
});
|
|
}
|
|
startSync({ interval }) {
|
|
// Convert the interval string to milliseconds.
|
|
const intervalMilliseconds = ms(interval);
|
|
return new Promise((resolve, reject) => {
|
|
const intervalSync = () => __awaiter(this, void 0, void 0, function* () {
|
|
if (this._syncIntervalId) {
|
|
clearInterval(this._syncIntervalId);
|
|
}
|
|
try {
|
|
yield this.push();
|
|
yield this.pull();
|
|
}
|
|
catch (error) {
|
|
this.stopSync();
|
|
reject(error);
|
|
}
|
|
// then we start sync again
|
|
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
|
|
});
|
|
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
|
|
});
|
|
}
|
|
stopSync() {
|
|
if (this._syncIntervalId) {
|
|
clearInterval(this._syncIntervalId);
|
|
this._syncIntervalId = undefined;
|
|
}
|
|
}
|
|
enqueueOperations({ syncDirection, syncPeerState }) {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
for (let syncState of syncPeerState) {
|
|
// Get the event log from the remote DWN if pull sync, or local DWN if push sync.
|
|
const eventLog = yield this.getDwnEventLog({
|
|
did: syncState.did,
|
|
dwnUrl: syncState.dwnUrl,
|
|
cursor: syncState.cursor,
|
|
syncDirection
|
|
});
|
|
const syncOperations = [];
|
|
for (let messageCid of eventLog) {
|
|
const watermark = this._ulidFactory();
|
|
// Use "did~dwnUrl~watermark~messageCid" as the key in the sync queue.
|
|
// Note: It is critical that `watermark` precedes `messageCid` to ensure that when the sync
|
|
// jobs are pulled off the queue, they are lexographically sorted oldest to newest.
|
|
const operationKey = [
|
|
syncState.did,
|
|
syncState.dwnUrl,
|
|
watermark,
|
|
messageCid
|
|
].join('~');
|
|
syncOperations.push({ type: 'put', key: operationKey, value: '' });
|
|
}
|
|
if (syncOperations.length > 0) {
|
|
const syncQueue = (syncDirection === 'pull')
|
|
? this.getPullQueue()
|
|
: this.getPushQueue();
|
|
yield syncQueue.batch(syncOperations);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
getDwnEventLog({ did, dwnUrl, syncDirection, cursor }) {
|
|
var _a;
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
let eventsReply = {};
|
|
if (syncDirection === 'pull') {
|
|
// When sync is a pull, get the event log from the remote DWN.
|
|
const eventsGetMessage = yield this.agent.dwn.createMessage({
|
|
author: did,
|
|
messageType: DwnInterface.EventsGet,
|
|
messageParams: { cursor }
|
|
});
|
|
try {
|
|
eventsReply = (yield this.agent.rpc.sendDwnRequest({
|
|
dwnUrl: dwnUrl,
|
|
targetDid: did,
|
|
message: eventsGetMessage
|
|
}));
|
|
}
|
|
catch (_b) {
|
|
// If a particular DWN service endpoint is unreachable, silently ignore.
|
|
}
|
|
}
|
|
else if (syncDirection === 'push') {
|
|
// When sync is a push, get the event log from the local DWN.
|
|
const eventsGetDwnResponse = yield this.agent.dwn.processRequest({
|
|
author: did,
|
|
target: did,
|
|
messageType: DwnInterface.EventsGet,
|
|
messageParams: { cursor }
|
|
});
|
|
eventsReply = eventsGetDwnResponse.reply;
|
|
}
|
|
const eventLog = (_a = eventsReply.entries) !== null && _a !== void 0 ? _a : [];
|
|
if (eventsReply.cursor) {
|
|
this.setCursor(did, dwnUrl, syncDirection, eventsReply.cursor);
|
|
}
|
|
return eventLog;
|
|
});
|
|
}
|
|
getDwnMessage({ author, messageCid }) {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
let { reply } = yield this.agent.dwn.processRequest({
|
|
author: author,
|
|
target: author,
|
|
messageType: DwnInterface.MessagesGet,
|
|
messageParams: {
|
|
messageCids: [messageCid]
|
|
}
|
|
});
|
|
// Absence of a messageEntry or message within messageEntry can happen because updating a
|
|
// Record creates another RecordsWrite with the same recordId. Only the first and
|
|
// most recent RecordsWrite messages are kept for a given recordId. Any RecordsWrite messages
|
|
// that aren't the first or most recent are discarded by the DWN.
|
|
if (!(reply.entries && reply.entries.length === 1)) {
|
|
return undefined;
|
|
}
|
|
const [messageEntry] = reply.entries;
|
|
const message = messageEntry.message;
|
|
if (!message) {
|
|
return undefined;
|
|
}
|
|
let dwnMessageWithBlob = { message };
|
|
// If the message is a RecordsWrite, either data will be present,
|
|
// OR we have to fetch it using a RecordsRead.
|
|
if (isRecordsWrite(messageEntry)) {
|
|
if (messageEntry.encodedData) {
|
|
const dataBytes = Convert.base64Url(messageEntry.encodedData).toUint8Array();
|
|
// ! TODO: test adding the messageEntry.message.descriptor.dataFormat to the Blob constructor.
|
|
dwnMessageWithBlob.data = new Blob([dataBytes]);
|
|
}
|
|
else {
|
|
let readResponse = yield this.agent.dwn.processRequest({
|
|
author: author,
|
|
target: author,
|
|
messageType: DwnInterface.RecordsRead,
|
|
messageParams: { filter: { recordId: messageEntry.message.recordId } }
|
|
});
|
|
const reply = readResponse.reply;
|
|
if (is2xx(reply.status.code) && reply.record) {
|
|
// If status code is 200-299, return the data.
|
|
dwnMessageWithBlob.data = yield NodeStream.consumeToBlob({ readable: reply.record.data });
|
|
}
|
|
else if (is4xx(reply.status.code)) {
|
|
// If status code is 400-499, typically 404 indicating the data no longer exists, it is
|
|
// likely that a `RecordsDelete` took place. `RecordsDelete` keeps a `RecordsWrite` and
|
|
// deletes the associated data, effectively acting as a "tombstone." Sync still needs to
|
|
// push this tombstone so that the `RecordsDelete` can be processed successfully.
|
|
}
|
|
else {
|
|
// If status code is anything else (likely 5xx), throw an error.
|
|
const { status: { code, detail } } = reply;
|
|
throw new Error(`SyncEngineLevel: (${code}) Failed to read data associated with record ${messageEntry.message.recordId}. ${detail}}`);
|
|
}
|
|
}
|
|
}
|
|
return dwnMessageWithBlob;
|
|
});
|
|
}
|
|
getSyncPeerState({ syncDirection }) {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
// Get a list of the DIDs of all registered identities.
|
|
const registeredIdentities = yield this._db.sublevel('registeredIdentities').keys().all();
|
|
// Array to accumulate the list of sync peers for each DID.
|
|
const syncPeerState = [];
|
|
for (let did of registeredIdentities) {
|
|
// First, confirm the DID can be resolved and extract the DWN service endpoint URLs.
|
|
const dwnEndpointUrls = yield getDwnServiceEndpointUrls(did, this.agent.did);
|
|
if (dwnEndpointUrls.length === 0) {
|
|
// Silently ignore and do not try to perform Sync for any DID that does not have a DWN
|
|
// service endpoint published in its DID document.
|
|
continue;
|
|
}
|
|
// Get the cursor (or undefined) for each (DID, DWN service endpoint, sync direction)
|
|
// combination and add it to the sync peer state array.
|
|
for (let dwnUrl of dwnEndpointUrls) {
|
|
const cursor = yield this.getCursor(did, dwnUrl, syncDirection);
|
|
syncPeerState.push({ did, dwnUrl, cursor });
|
|
}
|
|
}
|
|
return syncPeerState;
|
|
});
|
|
}
|
|
getCursor(did, dwnUrl, direction) {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
const cursorKey = `${did}~${dwnUrl}~${direction}`;
|
|
const cursorsStore = this.getCursorStore();
|
|
try {
|
|
const cursorValue = yield cursorsStore.get(cursorKey);
|
|
if (cursorValue) {
|
|
return JSON.parse(cursorValue);
|
|
}
|
|
}
|
|
catch (error) {
|
|
// Don't throw when a key wasn't found.
|
|
if (error.notFound) {
|
|
return undefined;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
setCursor(did, dwnUrl, direction, cursor) {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
const cursorKey = `${did}~${dwnUrl}~${direction}`;
|
|
const cursorsStore = this.getCursorStore();
|
|
yield cursorsStore.put(cursorKey, JSON.stringify(cursor));
|
|
});
|
|
}
|
|
/**
|
|
* The message store is used to prevent "echoes" that occur during a sync pull operation.
|
|
* After a message is confirmed to already be synchronized on the local DWN, its CID is added
|
|
* to the message store to ensure that any subsequent pull attempts are skipped.
|
|
*/
|
|
messageExists(did, messageCid) {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
const messageStore = this.getMessageStore(did);
|
|
// If the `messageCid` exists in this DID's store, return true. Otherwise, return false.
|
|
try {
|
|
yield messageStore.get(messageCid);
|
|
return true;
|
|
}
|
|
catch (error) {
|
|
if (error.notFound) {
|
|
return false;
|
|
}
|
|
throw error;
|
|
}
|
|
});
|
|
}
|
|
addMessage(did, messageCid) {
|
|
return __awaiter(this, void 0, void 0, function* () {
|
|
const messageStore = this.getMessageStore(did);
|
|
return yield messageStore.put(messageCid, '');
|
|
});
|
|
}
|
|
getMessageStore(did) {
|
|
return this._db.sublevel('history').sublevel(did).sublevel('messages');
|
|
}
|
|
getCursorStore() {
|
|
return this._db.sublevel('cursors');
|
|
}
|
|
getPushQueue() {
|
|
return this._db.sublevel('pushQueue');
|
|
}
|
|
getPullQueue() {
|
|
return this._db.sublevel('pullQueue');
|
|
}
|
|
}
|
|
//# sourceMappingURL=sync-engine-level.js.map
|