Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Sync Interface #222

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/tests-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
- name: Run tests for all packages
run: npm run test:node --ws -- --color
env:
TEST_DWN_URL: http://localhost:3000
TEST_DWN_URLS: http://localhost:3000

- name: Upload test coverage to Codecov
uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4
Expand Down Expand Up @@ -106,4 +106,4 @@ jobs:
- name: Run tests for all packages
run: npm run test:browser --ws -- --color
env:
TEST_DWN_URL: http://localhost:3000
TEST_DWN_URLS: http://localhost:3000
2 changes: 1 addition & 1 deletion packages/agent/karma.conf.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module.exports = function (config) {
timeout: 10000 // 10 seconds
},
// If an environment variable is defined, override the default test DWN URL.
testDwnUrl: process.env.TEST_DWN_URL,
testDwnUrls: process.env.TEST_DWN_URLS,
},

// list of files / patterns to load in the browser
Expand Down
225 changes: 123 additions & 102 deletions packages/agent/src/sync-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { BatchOperation } from 'level';
import type {
Event,
EventsGetReply,
GenericMessage,
MessagesGetReply,
Expand All @@ -16,13 +17,28 @@ import type { Web5ManagedAgent } from './types/agent.js';

import { webReadableToIsomorphicNodeReadable } from './utils.js';

// arbitrary number for now, but we should enforce some sane minimum
export const MIN_SYNC_INTERVAL = 3000;

type SyncDirection = 'pull' | 'push';

interface SyncOptions {
interval?: number
direction?: SyncDirection
}

export interface SyncManager {
agent: Web5ManagedAgent;
registerIdentity(options: { did: string }): Promise<void>;
startSync(options: { interval: number }): Promise<void>;

// sync will run the sync operation once.
// if a direction is passed, it will only sync in that direction.
sync(direction?: SyncDirection): Promise<void>;

// startSync will run sync on an interval
// if a direction is provided, it will only sync in that direction.
startSync(options?: SyncOptions): Promise<void>;
stopSync(): void;
push(): Promise<void>;
pull(): Promise<void>;
}

export type SyncManagerOptions = {
Expand All @@ -31,12 +47,11 @@ export type SyncManagerOptions = {
db?: Level;
};

type SyncDirection = 'push' | 'pull';

type SyncState = {
did: string;
dwnUrl: string;
watermark: string | undefined;
pullWatermark?: string;
pushWatermark?: string;
}

type DwnMessage = {
Expand Down Expand Up @@ -93,10 +108,7 @@ export class SyncManagerLevel implements SyncManager {
await this._db.clear();
}

public async pull(): Promise<void> {
const syncPeerState = await this.getSyncPeerState({ syncDirection: 'pull' });
await this.enqueueOperations({ syncDirection: 'pull', syncPeerState });

private async pull(): Promise<void> {
const pullQueue = this.getPullQueue();
const pullJobs = await pullQueue.iterator().all();

Expand Down Expand Up @@ -171,7 +183,9 @@ export class SyncManagerLevel implements SyncManager {
author : did,
messageType : 'RecordsRead',
messageOptions : {
recordId: message['recordId']
filter: {
recordId: message['recordId']
}
}
});

Expand Down Expand Up @@ -224,10 +238,7 @@ export class SyncManagerLevel implements SyncManager {
await pullQueue.batch(deleteOperations as any);
}

public async push(): Promise<void> {
const syncPeerState = await this.getSyncPeerState({ syncDirection: 'push' });
await this.enqueueOperations({ syncDirection: 'push', syncPeerState });

private async push(): Promise<void> {
const pushQueue = this.getPushQueue();
const pushJobs = await pushQueue.iterator().all();

Expand Down Expand Up @@ -293,25 +304,21 @@ export class SyncManagerLevel implements SyncManager {
await registeredIdentities.put(did, '');
}

public startSync(options: {
interval: number
}): Promise<void> {
const { interval = 120_000 } = options;

public startSync(options: SyncOptions = {}): Promise<void> {
const { interval = MIN_SYNC_INTERVAL, direction } = options;
return new Promise((resolve, reject) => {
if (this._syncIntervalId) {
clearInterval(this._syncIntervalId);
}

this._syncIntervalId = setInterval(async () => {
try {
await this.push();
await this.pull();
await this.sync(direction);
} catch (error) {
this.stopSync();
reject(error);
}
}, interval);
}, interval >= MIN_SYNC_INTERVAL ? interval : MIN_SYNC_INTERVAL);
});
}

Expand All @@ -322,97 +329,104 @@ export class SyncManagerLevel implements SyncManager {
}
}

private async enqueueOperations(options: {
syncDirection: SyncDirection,
syncPeerState: SyncState[]
}) {
const { syncDirection, syncPeerState } = options;

for (let syncState of syncPeerState) {
// Get the event log from the remote DWN if pull sync, or local DWN if push sync.
const eventLog = await this.getDwnEventLog({
did : syncState.did,
dwnUrl : syncState.dwnUrl,
syncDirection,
watermark : syncState.watermark
});
public async sync(direction?: SyncDirection): Promise<void> {
await this.enqueueOperations(direction);
// enqueue operations handles the direction logic.
// we can just run both operations and only enqueued events will sync.
await Promise.all([
this.push(), this.pull()
]);
}

const syncOperations: DbBatchOperation[] = [];
private createOperationKey(did: string, dwnUrl: string, watermark: string, messageCid: string): string {
return [did, dwnUrl, watermark, messageCid].join('~');
}

for (let event of eventLog) {
/** Use "did~dwnUrl~watermark~messageCid" as the key in the sync queue.
* Note: It is critical that `watermark` precedes `messageCid` to
* ensure that when the sync jobs are pulled off the queue, they
* are lexographically sorted oldest to newest. */
const operationKey = [
syncState.did,
syncState.dwnUrl,
event.watermark,
event.messageCid
].join('~');
private dbBatchOperationPut(did: string, dwnUrl: string, watermark: string, messageCid: string): DbBatchOperation {
const key = this.createOperationKey(did, dwnUrl, watermark, messageCid);
return { type: 'put', key, value: '' };
}

const operation: DbBatchOperation = { type: 'put', key: operationKey, value: '' };
/**
* Enqueues the operations needed for sync based on the supplied direction.
*
* @param direction the optional direction in which you would like to enqueue sync events for.
* If no direction is supplied it will sync in both directions.
*/
async enqueueOperations(direction?: SyncDirection) {
const syncPeerState = await this.getSyncPeerState();

syncOperations.push(operation);
for (let syncState of syncPeerState) {
const batchPromises = [];
if (direction === undefined || direction === 'push') {
const localEventsPromise = this.getLocalDwnEvents({
did : syncState.did,
watermark : syncState.pushWatermark,
});
batchPromises.push(this.batchOperations('push', localEventsPromise, syncState));
}

if (syncOperations.length > 0) {
const syncQueue = (syncDirection === 'pull')
? this.getPullQueue()
: this.getPushQueue();
await syncQueue.batch(syncOperations as any);
if(direction === undefined || direction === 'pull') {
const remoteEventsPromise = this.getRemoteEvents({
did : syncState.did,
dwnUrl : syncState.dwnUrl,
watermark : syncState.pullWatermark,
});
batchPromises.push(this.batchOperations('pull', remoteEventsPromise, syncState));
}
await Promise.all(batchPromises);
}
}

private async getDwnEventLog(options: {
did: string,
dwnUrl: string,
syncDirection: SyncDirection,
watermark?: string
}) {
const { did, dwnUrl, syncDirection, watermark } = options;
private async batchOperations(direction: SyncDirection, eventsPromise: Promise<Event[]>, syncState: SyncState): Promise<void> {
const { did, dwnUrl } = syncState;
const operations: DbBatchOperation[] = [];
(await eventsPromise).forEach(e => operations.push(this.dbBatchOperationPut(did, dwnUrl, e.watermark, e.messageCid)));
return direction === 'pull' ? this.getPullQueue().batch(operations as any) : this.getPushQueue().batch(operations as any);
}

private async getLocalDwnEvents(options:{ did: string, watermark?: string }) {
const { did, watermark } = options;
let eventsReply = {} as EventsGetReply;
({ reply: eventsReply } = await this.agent.dwnManager.processRequest({
author : did,
target : did,
messageType : 'EventsGet',
messageOptions : { watermark }
}));

return eventsReply.events ?? [];
}

if (syncDirection === 'pull') {
// When sync is a pull, get the event log from the remote DWN.
const eventsGetMessage = await this.agent.dwnManager.createMessage({
author : did,
messageType : 'EventsGet',
messageOptions : { watermark }
});
private async getRemoteEvents(options: { did: string, dwnUrl: string, watermark?: string }) {
const { did, dwnUrl, watermark } = options;

try {
eventsReply = await this.agent.rpcClient.sendDwnRequest({
dwnUrl : dwnUrl,
targetDid : did,
message : eventsGetMessage
});
} catch {
// If a particular DWN service endpoint is unreachable, silently ignore.
}
let eventsReply = {} as EventsGetReply;

} else if (syncDirection === 'push') {
// When sync is a push, get the event log from the local DWN.
({ reply: eventsReply } = await this.agent.dwnManager.processRequest({
author : did,
target : did,
messageType : 'EventsGet',
messageOptions : { watermark }
}));
}
const eventsGetMessage = await this.agent.dwnManager.createMessage({
author : did,
messageType : 'EventsGet',
messageOptions : { watermark }
});

const eventLog = eventsReply.events ?? [];
try {
eventsReply = await this.agent.rpcClient.sendDwnRequest({
dwnUrl : dwnUrl,
targetDid : did,
message : eventsGetMessage
});
} catch {
// If a particular DWN service endpoint is unreachable, silently ignore.
}

return eventLog;
return eventsReply.events ?? [];
}

private async getDwnMessage(
author: string,
messageCid: string
): Promise<DwnMessage | undefined> {
let messagesGetResponse = await this.agent.dwnManager.processRequest({
const messagesGetResponse = await this.agent.dwnManager.processRequest({
author : author,
target : author,
messageType : 'MessagesGet',
Expand Down Expand Up @@ -455,7 +469,9 @@ export class SyncManagerLevel implements SyncManager {
target : author,
messageType : 'RecordsRead',
messageOptions : {
recordId: writeMessage.recordId
filter: {
recordId: writeMessage.recordId
}
}
});
const reply = readResponse.reply as RecordsReadReply;
Expand All @@ -482,11 +498,7 @@ export class SyncManagerLevel implements SyncManager {
return dwnMessage;
}

private async getSyncPeerState(options: {
syncDirection: SyncDirection
}): Promise<SyncState[]> {
const { syncDirection } = options;

private async getSyncPeerState(): Promise<SyncState[]> {
// Get a list of the DIDs of all registered identities.
const registeredIdentities = await this._db.sublevel('registeredIdentities').keys().all();

Expand Down Expand Up @@ -521,35 +533,44 @@ export class SyncManagerLevel implements SyncManager {
/** Get the watermark (or undefined) for each (DID, DWN service endpoint, sync direction)
* combination and add it to the sync peer state array. */
for (let dwnUrl of service.serviceEndpoint.nodes) {
const watermark = await this.getWatermark(did, dwnUrl, syncDirection);
syncPeerState.push({ did, dwnUrl, watermark });
try {
const syncState = await this.getSyncState(did, dwnUrl);
syncPeerState.push(syncState);
} catch(error) {
// go onto next peer if this fails
}
}
}

return syncPeerState;
}

private async getWatermark(did: string, dwnUrl: string, direction: SyncDirection) {
private async getWatermark(did: string, dwnUrl: string, direction: SyncDirection): Promise<string|undefined> {
const wmKey = `${did}~${dwnUrl}~${direction}`;
const watermarkStore = this.getWatermarkStore();

try {
return await watermarkStore.get(wmKey);
} catch(error: any) {
// Don't throw when a key wasn't found.
if (error.notFound) {
return undefined;
}
throw new Error('SyncManager: invalid watermark store');
}
}

private async setWatermark(did: string, dwnUrl: string, direction: SyncDirection, watermark: string) {
const wmKey = `${did}~${dwnUrl}~${direction}`;
const watermarkStore = this.getWatermarkStore();

await watermarkStore.put(wmKey, watermark);
}

private async getSyncState(did: string, dwnUrl: string): Promise<SyncState> {
const pullWatermark = await this.getWatermark(did, dwnUrl, 'pull');
const pushWatermark = await this.getWatermark(did, dwnUrl, 'push');
return { did, dwnUrl, pullWatermark, pushWatermark };
}

/**
* The message store is used to prevent "echoes" that occur during a sync pull operation.
* After a message is confirmed to already be synchronized on the local DWN, its CID is added
Expand Down
Loading