var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } return new (P || (P = Promise))(function (resolve, reject) { function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } step((generator = generator.apply(thisArg, _arguments || [])).next()); }); }; import ms from 'ms'; import { Level } from 'level'; import { monotonicFactory } from 'ulidx'; import { Convert, NodeStream } from '@web5/common'; import { DataStream } from '@tbd54566975/dwn-sdk-js'; import { DwnInterface } from './types/dwn.js'; import { getDwnServiceEndpointUrls, isRecordsWrite } from './utils.js'; const is2xx = (code) => code >= 200 && code <= 299; const is4xx = (code) => code >= 400 && code <= 499; export class SyncEngineLevel { constructor({ agent, dataPath, db }) { this._agent = agent; this._db = (db) ? db : new Level(dataPath !== null && dataPath !== void 0 ? dataPath : 'DATA/AGENT/SYNC_STORE'); this._ulidFactory = monotonicFactory(); } /** * Retrieves the `Web5PlatformAgent` execution context. * * @returns The `Web5PlatformAgent` instance that represents the current execution context. * @throws Will throw an error if the `agent` instance property is undefined. */ get agent() { if (this._agent === undefined) { throw new Error('SyncEngineLevel: Unable to determine agent execution context.'); } return this._agent; } set agent(agent) { this._agent = agent; } clear() { return __awaiter(this, void 0, void 0, function* () { yield this._db.clear(); }); } close() { return __awaiter(this, void 0, void 0, function* () { yield this._db.close(); }); } pull() { var _a; return __awaiter(this, void 0, void 0, function* () { const syncPeerState = yield this.getSyncPeerState({ syncDirection: 'pull' }); yield this.enqueueOperations({ syncDirection: 'pull', syncPeerState }); const pullQueue = this.getPullQueue(); const pullJobs = yield pullQueue.iterator().all(); const deleteOperations = []; const errored = new Set(); for (let job of pullJobs) { const [key] = job; const [did, dwnUrl, _, messageCid] = key.split('~'); // If a particular DWN service endpoint is unreachable, skip subsequent pull operations. if (errored.has(dwnUrl)) { continue; } const messageExists = yield this.messageExists(did, messageCid); if (messageExists) { deleteOperations.push({ type: 'del', key: key }); continue; } const messagesGet = yield this.agent.dwn.createMessage({ author: did, messageType: DwnInterface.MessagesGet, messageParams: { messageCids: [messageCid] } }); let reply; try { reply = (yield this.agent.rpc.sendDwnRequest({ dwnUrl, targetDid: did, message: messagesGet })); } catch (e) { errored.add(dwnUrl); continue; } // TODO: Refactor this to batch network requests for record messages rather than one at a time. // Per Moe, this loop exists because the original intent was to pass multiple messageCid // values to batch network requests for record messages rather than one at a time, as it // is currently implemented. Either the pull() method should be refactored to batch // getting messages OR this loop should be removed. for (let entry of (_a = reply.entries) !== null && _a !== void 0 ? _a : []) { if (entry.error || !entry.message) { yield this.addMessage(did, messageCid); deleteOperations.push({ type: 'del', key: key }); continue; } let dataStream; if (isRecordsWrite(entry)) { const { encodedData } = entry; const message = entry.message; if (encodedData) { const dataBytes = Convert.base64Url(encodedData).toUint8Array(); dataStream = DataStream.fromBytes(dataBytes); } else { const recordsRead = yield this.agent.dwn.createMessage({ author: did, messageType: DwnInterface.RecordsRead, messageParams: { filter: { recordId: message.recordId } } }); const recordsReadReply = yield this.agent.rpc.sendDwnRequest({ dwnUrl, targetDid: did, message: recordsRead.message }); const { record, status: readStatus } = recordsReadReply; if (is2xx(readStatus.code) && record) { // If the read was successful, convert the data stream from web ReadableStream // to Node.js Readable so that the DWN can process it. // TODO: Remove the type assertion once sendDwnRequest type is fixed to return a ReadableStream. dataStream = NodeStream.fromWebReadable({ readableStream: record.data }); } else if (readStatus.code >= 400) { // writes record without data, if this is an initial records write, it will succeed. const pruneReply = yield this.agent.dwn.processMessage({ targetDid: did, message }); if (pruneReply.status.code === 202 || pruneReply.status.code === 409) { yield this.addMessage(did, messageCid); deleteOperations.push({ type: 'del', key: key }); continue; } else { throw new Error(`SyncManager: Failed to sync tombstone for message '${messageCid}'`); } } } } const pullReply = yield this.agent.dwn.processMessage({ targetDid: did, message: entry.message, dataStream }); if (pullReply.status.code === 202 || pullReply.status.code === 409) { yield this.addMessage(did, messageCid); deleteOperations.push({ type: 'del', key: key }); } } } yield pullQueue.batch(deleteOperations); }); } push() { return __awaiter(this, void 0, void 0, function* () { const syncPeerState = yield this.getSyncPeerState({ syncDirection: 'push' }); yield this.enqueueOperations({ syncDirection: 'push', syncPeerState }); const pushQueue = this.getPushQueue(); const pushJobs = yield pushQueue.iterator().all(); const deleteOperations = []; const errored = new Set(); for (let job of pushJobs) { const [key] = job; const [did, dwnUrl, _, messageCid] = key.split('~'); // If a particular DWN service endpoint is unreachable, skip subsequent push operations. if (errored.has(dwnUrl)) { continue; } // Attempt to retrieve the message from the local DWN. const dwnMessage = yield this.getDwnMessage({ author: did, messageCid }); // If the message does not exist on the local DWN, remove the sync operation from the // push queue, update the push watermark for this DID/DWN endpoint combination, add the // message to the local message store, and continue to the next job. if (!dwnMessage) { deleteOperations.push({ type: 'del', key: key }); yield this.addMessage(did, messageCid); continue; } try { const reply = yield this.agent.rpc.sendDwnRequest({ dwnUrl, targetDid: did, data: dwnMessage.data, message: dwnMessage.message }); // Update the watermark and add the messageCid to the Sync Message Store if either: // - 202: message was successfully written to the remote DWN // - 409: message was already present on the remote DWN if (reply.status.code === 202 || reply.status.code === 409) { yield this.addMessage(did, messageCid); deleteOperations.push({ type: 'del', key: key }); } } catch (_a) { // Error is intentionally ignored; 'errored' set is updated with 'dwnUrl'. errored.add(dwnUrl); } } yield pushQueue.batch(deleteOperations); }); } registerIdentity({ did }) { return __awaiter(this, void 0, void 0, function* () { // Get a reference to the `registeredIdentities` sublevel. const registeredIdentities = this._db.sublevel('registeredIdentities'); // Add (or overwrite, if present) the Identity's DID as a registered identity. yield registeredIdentities.put(did, ''); }); } startSync({ interval }) { // Convert the interval string to milliseconds. const intervalMilliseconds = ms(interval); return new Promise((resolve, reject) => { const intervalSync = () => __awaiter(this, void 0, void 0, function* () { if (this._syncIntervalId) { clearInterval(this._syncIntervalId); } try { yield this.push(); yield this.pull(); } catch (error) { this.stopSync(); reject(error); } // then we start sync again this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); }); this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); }); } stopSync() { if (this._syncIntervalId) { clearInterval(this._syncIntervalId); this._syncIntervalId = undefined; } } enqueueOperations({ syncDirection, syncPeerState }) { return __awaiter(this, void 0, void 0, function* () { for (let syncState of syncPeerState) { // Get the event log from the remote DWN if pull sync, or local DWN if push sync. const eventLog = yield this.getDwnEventLog({ did: syncState.did, dwnUrl: syncState.dwnUrl, cursor: syncState.cursor, syncDirection }); const syncOperations = []; for (let messageCid of eventLog) { const watermark = this._ulidFactory(); // Use "did~dwnUrl~watermark~messageCid" as the key in the sync queue. // Note: It is critical that `watermark` precedes `messageCid` to ensure that when the sync // jobs are pulled off the queue, they are lexographically sorted oldest to newest. const operationKey = [ syncState.did, syncState.dwnUrl, watermark, messageCid ].join('~'); syncOperations.push({ type: 'put', key: operationKey, value: '' }); } if (syncOperations.length > 0) { const syncQueue = (syncDirection === 'pull') ? this.getPullQueue() : this.getPushQueue(); yield syncQueue.batch(syncOperations); } } }); } getDwnEventLog({ did, dwnUrl, syncDirection, cursor }) { var _a; return __awaiter(this, void 0, void 0, function* () { let eventsReply = {}; if (syncDirection === 'pull') { // When sync is a pull, get the event log from the remote DWN. const eventsGetMessage = yield this.agent.dwn.createMessage({ author: did, messageType: DwnInterface.EventsGet, messageParams: { cursor } }); try { eventsReply = (yield this.agent.rpc.sendDwnRequest({ dwnUrl: dwnUrl, targetDid: did, message: eventsGetMessage })); } catch (_b) { // If a particular DWN service endpoint is unreachable, silently ignore. } } else if (syncDirection === 'push') { // When sync is a push, get the event log from the local DWN. const eventsGetDwnResponse = yield this.agent.dwn.processRequest({ author: did, target: did, messageType: DwnInterface.EventsGet, messageParams: { cursor } }); eventsReply = eventsGetDwnResponse.reply; } const eventLog = (_a = eventsReply.entries) !== null && _a !== void 0 ? _a : []; if (eventsReply.cursor) { this.setCursor(did, dwnUrl, syncDirection, eventsReply.cursor); } return eventLog; }); } getDwnMessage({ author, messageCid }) { return __awaiter(this, void 0, void 0, function* () { let { reply } = yield this.agent.dwn.processRequest({ author: author, target: author, messageType: DwnInterface.MessagesGet, messageParams: { messageCids: [messageCid] } }); // Absence of a messageEntry or message within messageEntry can happen because updating a // Record creates another RecordsWrite with the same recordId. Only the first and // most recent RecordsWrite messages are kept for a given recordId. Any RecordsWrite messages // that aren't the first or most recent are discarded by the DWN. if (!(reply.entries && reply.entries.length === 1)) { return undefined; } const [messageEntry] = reply.entries; const message = messageEntry.message; if (!message) { return undefined; } let dwnMessageWithBlob = { message }; // If the message is a RecordsWrite, either data will be present, // OR we have to fetch it using a RecordsRead. if (isRecordsWrite(messageEntry)) { if (messageEntry.encodedData) { const dataBytes = Convert.base64Url(messageEntry.encodedData).toUint8Array(); // ! TODO: test adding the messageEntry.message.descriptor.dataFormat to the Blob constructor. dwnMessageWithBlob.data = new Blob([dataBytes]); } else { let readResponse = yield this.agent.dwn.processRequest({ author: author, target: author, messageType: DwnInterface.RecordsRead, messageParams: { filter: { recordId: messageEntry.message.recordId } } }); const reply = readResponse.reply; if (is2xx(reply.status.code) && reply.record) { // If status code is 200-299, return the data. dwnMessageWithBlob.data = yield NodeStream.consumeToBlob({ readable: reply.record.data }); } else if (is4xx(reply.status.code)) { // If status code is 400-499, typically 404 indicating the data no longer exists, it is // likely that a `RecordsDelete` took place. `RecordsDelete` keeps a `RecordsWrite` and // deletes the associated data, effectively acting as a "tombstone." Sync still needs to // push this tombstone so that the `RecordsDelete` can be processed successfully. } else { // If status code is anything else (likely 5xx), throw an error. const { status: { code, detail } } = reply; throw new Error(`SyncEngineLevel: (${code}) Failed to read data associated with record ${messageEntry.message.recordId}. ${detail}}`); } } } return dwnMessageWithBlob; }); } getSyncPeerState({ syncDirection }) { return __awaiter(this, void 0, void 0, function* () { // Get a list of the DIDs of all registered identities. const registeredIdentities = yield this._db.sublevel('registeredIdentities').keys().all(); // Array to accumulate the list of sync peers for each DID. const syncPeerState = []; for (let did of registeredIdentities) { // First, confirm the DID can be resolved and extract the DWN service endpoint URLs. const dwnEndpointUrls = yield getDwnServiceEndpointUrls(did, this.agent.did); if (dwnEndpointUrls.length === 0) { // Silently ignore and do not try to perform Sync for any DID that does not have a DWN // service endpoint published in its DID document. continue; } // Get the cursor (or undefined) for each (DID, DWN service endpoint, sync direction) // combination and add it to the sync peer state array. for (let dwnUrl of dwnEndpointUrls) { const cursor = yield this.getCursor(did, dwnUrl, syncDirection); syncPeerState.push({ did, dwnUrl, cursor }); } } return syncPeerState; }); } getCursor(did, dwnUrl, direction) { return __awaiter(this, void 0, void 0, function* () { const cursorKey = `${did}~${dwnUrl}~${direction}`; const cursorsStore = this.getCursorStore(); try { const cursorValue = yield cursorsStore.get(cursorKey); if (cursorValue) { return JSON.parse(cursorValue); } } catch (error) { // Don't throw when a key wasn't found. if (error.notFound) { return undefined; } } }); } setCursor(did, dwnUrl, direction, cursor) { return __awaiter(this, void 0, void 0, function* () { const cursorKey = `${did}~${dwnUrl}~${direction}`; const cursorsStore = this.getCursorStore(); yield cursorsStore.put(cursorKey, JSON.stringify(cursor)); }); } /** * The message store is used to prevent "echoes" that occur during a sync pull operation. * After a message is confirmed to already be synchronized on the local DWN, its CID is added * to the message store to ensure that any subsequent pull attempts are skipped. */ messageExists(did, messageCid) { return __awaiter(this, void 0, void 0, function* () { const messageStore = this.getMessageStore(did); // If the `messageCid` exists in this DID's store, return true. Otherwise, return false. try { yield messageStore.get(messageCid); return true; } catch (error) { if (error.notFound) { return false; } throw error; } }); } addMessage(did, messageCid) { return __awaiter(this, void 0, void 0, function* () { const messageStore = this.getMessageStore(did); return yield messageStore.put(messageCid, ''); }); } getMessageStore(did) { return this._db.sublevel('history').sublevel(did).sublevel('messages'); } getCursorStore() { return this._db.sublevel('cursors'); } getPushQueue() { return this._db.sublevel('pushQueue'); } getPullQueue() { return this._db.sublevel('pullQueue'); } } //# sourceMappingURL=sync-engine-level.js.map