Skip to content

Commit

Permalink
Some more clean up for import/store (#391)
Browse files Browse the repository at this point in the history
* simplification of records class

* rename record options to reflect signing/storing
  • Loading branch information
LiranCohen authored Jan 31, 2024
1 parent e36b35e commit 4d1ada1
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 148 deletions.
2 changes: 1 addition & 1 deletion packages/agent/src/dwn-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ export class DwnManager {
});

if (dwnMessageConstructor === RecordsWrite){
if (request.import) {
if (request.signAsOwner) {
await (dwnMessage as RecordsWrite).signAsOwner(dwnSigner);
}

Check warning on line 288 in packages/agent/src/dwn-manager.ts

View check run for this annotation

Codecov / codecov/patch

packages/agent/src/dwn-manager.ts#L287-L288

Added lines #L287 - L288 were not covered by tests
}
Expand Down
2 changes: 1 addition & 1 deletion packages/agent/src/types/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export type ProcessDwnRequest = DwnRequest & {
rawMessage?: unknown;
messageOptions?: unknown;
store?: boolean;
import?: boolean;
signAsOwner?: boolean;
};

export type SendDwnRequest = DwnRequest & (ProcessDwnRequest | { messageCid: string })
Expand Down
274 changes: 128 additions & 146 deletions packages/api/src/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,31 @@ import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';
import type { ResponseStatus } from './dwn-api.js';
import { dataToBlob } from './utils.js';

type ProcessRecordRequest = {
dataStream?: Blob | ReadableStream | Readable;
rawMessage?: Partial<RecordsWriteMessage>;
messageOptions?: unknown;
store: boolean;
import: boolean;
};
class SendCache {
private static cache = new Map();
static sendCacheLimit = 100;

static set(id: string, target: string) {
let targetCache = SendCache.cache.get(id) || new Set();
SendCache.cache.delete(id);
SendCache.cache.set(id, targetCache);
if (this.cache.size > SendCache.sendCacheLimit) {
const firstRecord = SendCache.cache.keys().next().value;
SendCache.cache.delete(firstRecord);
}

Check warning on line 26 in packages/api/src/record.ts

View check run for this annotation

Codecov / codecov/patch

packages/api/src/record.ts#L24-L26

Added lines #L24 - L26 were not covered by tests
targetCache.delete(target);
targetCache.add(target);
if (targetCache.size > SendCache.sendCacheLimit) {
const firstTarget = targetCache.keys().next().value;
targetCache.delete(firstTarget);
}

Check warning on line 32 in packages/api/src/record.ts

View check run for this annotation

Codecov / codecov/patch

packages/api/src/record.ts#L30-L32

Added lines #L30 - L32 were not covered by tests
}

static check(id: string, target: string){
let targetCache = SendCache.cache.get(id);
return target && targetCache ? targetCache.has(target) : targetCache;
}
}

/**
* Options that are passed to Record constructor.
Expand Down Expand Up @@ -78,6 +96,10 @@ export type RecordUpdateOptions = {
* @beta
*/
export class Record implements RecordModel {
// Cache to minimize the amount of redundant two-phase commits we do in store() and send()
// Retains awareness of the last 100 records stored/sent for up to 100 target DIDs each.
private static _sendCache = SendCache;

// Record instance metadata.
private _agent: Web5Agent;
private _connectedDid: string;
Expand All @@ -93,36 +115,11 @@ export class Record implements RecordModel {
private _descriptor: RecordsWriteDescriptor;
private _encryption?: RecordsWriteMessage['encryption'];
private _initialWrite: RecordOptions['initialWrite'];
private _initialWriteStored: boolean;
private _initialWriteProcessed: boolean;
private _recordId: string;
private _protocolRole: RecordOptions['protocolRole'];
// Getters for immutable DWN Record properties.

// Cache to minimize the amount of redundant two-phase commits we do in store() and send()
// Retains awareness of the last 100 records stored/sent for up to 100 target DIDs each.
static sendCache = new Map();
static sendCacheLimit = 100;
static setSendCache(recordId, target) {
const recordCache = Record.sendCache;
let targetCache = recordCache.get(recordId) || new Set();
recordCache.delete(recordId);
recordCache.set(recordId, targetCache);
if (recordCache.size > Record.sendCacheLimit) {
const firstRecord = recordCache.keys().next().value;
recordCache.delete(firstRecord);
}
targetCache.delete(target);
targetCache.add(target);
if (targetCache.size > Record.sendCacheLimit) {
const firstTarget = targetCache.keys().next().value;
targetCache.delete(firstTarget);
}
}
static checkSendCache(recordId, target){
let targetCache = Record.sendCache.get(recordId);
return target && targetCache ? targetCache.has(target) : targetCache;
}

/** Record's signatures attestation */
get attestation(): RecordsWriteMessage['attestation'] { return this._attestation; }

Expand Down Expand Up @@ -193,6 +190,23 @@ export class Record implements RecordModel {
/** Record's published status (true/false) */
get published() { return this._descriptor.published; }

/**
* Returns a copy of the raw `RecordsWriteMessage` that was used to create the current `Record` instance.
*/
private get rawMessage(): RecordsWriteMessage {
const message = JSON.parse(JSON.stringify({
contextId : this._contextId,
recordId : this._recordId,
descriptor : this._descriptor,
attestation : this._attestation,
authorization : this._authorization,
encryption : this._encryption,
}));

removeUndefinedProperties(message);
return message;
}

constructor(agent: Web5Agent, options: RecordOptions) {

this._agent = agent;
Expand Down Expand Up @@ -346,100 +360,32 @@ export class Record implements RecordModel {
return dataObj;
}

private _prepareMessage(options: ProcessRecordRequest): ProcessDwnRequest {
const request: ProcessDwnRequest = {
messageType : DwnInterfaceName.Records + DwnMethodName.Write,
author : this._connectedDid,
target : this._connectedDid,
import : options.import,
store : options.store,
};

if (options.rawMessage) {
removeUndefinedProperties(options.rawMessage);
request.rawMessage = options.rawMessage as Partial<RecordsWriteMessage>;
}
else {
request.messageOptions = options.messageOptions;
}

if (options.dataStream) {
request.dataStream = options.dataStream;
}

return request;
}

// Handles the various conditions around there being an initial write, whether to store initial/current state,
// and whether to add an owner signature to the initial write to enable storage when protocol rules require it.
private async _processRecord(options: { store: boolean, import: boolean }): Promise<ResponseStatus> {

const { store = true, import: _import = false } = options;
const initialWrite = this._initialWrite;

// Is there an initial write? Have we already stored it?
if (initialWrite && !this._initialWriteStored) {
// There is an initial write, and we have not dealt with it before, so prepare to do so
const requestOptions = this._prepareMessage({
import : _import,
store : store,
rawMessage : {
contextId: this._contextId,
...initialWrite
}
});

// Process the prepared initial write, with the options set for storing and/or importing with an owner sig.
const agentResponse = await this._agent.processDwnRequest(requestOptions);
const { message, reply: { status } } = agentResponse;
const responseMessage = message as RecordsWriteMessage;

// If we are importing, make sure to update the initial write's authorization, because now it will have the owner's signature on it
if (200 <= status.code && status.code <= 299) {
this._initialWriteStored = true;
if (_import) initialWrite.authorization = responseMessage.authorization;
}
}

const requestOptions = this._prepareMessage({
import : !initialWrite && _import, // if there is no initial write, this is the initial write (or having a forced import sig), so sign it.
store : store,
dataStream : await this.data.blob(),
rawMessage : {
contextId : this._contextId,
recordId : this._recordId,
descriptor : this._descriptor,
attestation : this._attestation,
authorization : this._authorization,
encryption : this._encryption,
}
});

const agentResponse = await this._agent.processDwnRequest(requestOptions);
const { message, reply: { status } } = agentResponse;
const responseMessage = message as RecordsWriteMessage;

if (200 <= status.code && status.code <= 299) {
// If we are importing, make sure to update the current record state's authorization, because now it will have the owner's signature on it.
if (_import) this._authorization = responseMessage.authorization;
}

return { status };
}

// Uses _processRecord to manifest the storage-centric features of committing a foreign record to the local DWN
async store(options?: { import: boolean }): Promise<ResponseStatus> {
// process the record and always set store to true
return this._processRecord({ ...options, store: true });
/**
* Stores the current record state as well as any initial write to the owner's DWN.
*
* @param importRecord - if true, the record will signed by the owner before storing it to the owner's DWN. Defaults to true.
* @returns the status of the store request
*
* @beta
*/
async store(importRecord: boolean = true): Promise<ResponseStatus> {
// if we are importing the record we sign it as the owner
return this.processRecord({ signAsOwner: importRecord, store: true });
}

// Uses _processRecord to manifest the import-centric features of ingesting and signing a foreign record
async import(options?: { store: boolean }): Promise<ResponseStatus> {
// process the record and always set import to true, only skip storage if explicitly set to false
return this._processRecord({ store: options?.store !== false, import: true });
/**
* Signs the current record state as well as any initial write and optionally stores it to the owner's DWN.
* This is useful when importing a record that was signed by someone else int your own DWN.
*
* @param store - if true, the record will be stored to the owner's DWN after signing. Defaults to true.
* @returns the status of the import request
*
* @beta
*/
async import(store: boolean = true): Promise<ResponseStatus> {
return this.processRecord({ store, signAsOwner: true });
}


/**
* Send the current record to a remote DWN by specifying their DID
* If no DID is specified, the target is assumed to be the owner (connectedDID).
Expand All @@ -456,10 +402,9 @@ export class Record implements RecordModel {
target??= this._connectedDid;

// Is there an initial write? Do we know if we've already sent it to this target?
if (initialWrite && !Record.checkSendCache(this._recordId, target)){
if (initialWrite && !Record._sendCache.check(this._recordId, target)){
// We do have an initial write, so prepare it for sending to the target.
const rawMessage = {
contextId: this._contextId,
...initialWrite
};
removeUndefinedProperties(rawMessage);
Expand All @@ -473,7 +418,7 @@ export class Record implements RecordModel {
await this._agent.sendDwnRequest(initialState);

// Set the cache to maintain awareness that we don't need to send the initial write next time.
Record.setSendCache(this._recordId, target);
Record._sendCache.set(this._recordId, target);
}

// Prepare the current state for sending to the target
Expand All @@ -486,16 +431,7 @@ export class Record implements RecordModel {

// if there is already an authz payload, just pass along the record
if (this._authorization) {
const rawMessage = {
contextId : this._contextId,
recordId : this._recordId,
descriptor : this._descriptor,
attestation : this._attestation,
authorization : this._authorization,
encryption : this._encryption,
};
removeUndefinedProperties(rawMessage);
latestState.rawMessage = rawMessage;
latestState.rawMessage = { ...this.rawMessage };
} else {
// if there is no authz, pass options so the DWN SDK can construct and sign the record
latestState.messageOptions = this.toJSON();
Expand Down Expand Up @@ -618,24 +554,18 @@ export class Record implements RecordModel {
const responseMessage = message as RecordsWriteMessage;

if (200 <= status.code && status.code <= 299) {
// copy the original raw message to the initial write before we update the values.
if (!this._initialWrite) {
const initialWrite: RecordsWriteMessage = {
contextId : this._contextId,
recordId : this._recordId,
descriptor : this._descriptor,
attestation : this._attestation,
authorization : this._authorization,
encryption : this._encryption,
};
removeUndefinedProperties(initialWrite);
this._initialWrite = JSON.parse(JSON.stringify(initialWrite));
this._initialWrite = { ...this.rawMessage };
}

// Only update the local Record instance mutable properties if the record was successfully (over)written.
this._authorization = responseMessage.authorization;
this._protocolRole = messageOptions.protocolRole;
mutableDescriptorProperties.forEach(property => {
this._descriptor[property] = responseMessage.descriptor[property];
});

// Cache data.
if (options.data !== undefined) {
this._encodedData = dataBlob;
Expand All @@ -645,6 +575,58 @@ export class Record implements RecordModel {
return { status };
}

// Handles the various conditions around there being an initial write, whether to store initial/current state,
// and whether to add an owner signature to the initial write to enable storage when protocol rules require it.
private async processRecord(options: { store: boolean, signAsOwner: boolean }): Promise<ResponseStatus> {
const { store, signAsOwner } = options;

// if there is an initial write and we haven't already processed it, we first process it and marked it as such.
if (this._initialWrite && !this._initialWriteProcessed) {
const initialWriteRequest: ProcessDwnRequest = {
messageType : DwnInterfaceName.Records + DwnMethodName.Write,
rawMessage : this.initialWrite,
author : this._connectedDid,
target : this._connectedDid,
signAsOwner : signAsOwner,
store,
};

// Process the prepared initial write, with the options set for storing and/or signing as the owner.
const agentResponse = await this._agent.processDwnRequest(initialWriteRequest);
const { message, reply: { status } } = agentResponse;
const responseMessage = message as RecordsWriteMessage;

// If we are signing as owner, make sure to update the initial write's authorization, because now it will have the owner's signature on it
// set it to processed so that we don't repeat this process again
if (200 <= status.code && status.code <= 299) {
this._initialWriteProcessed = true;
if (signAsOwner) this.initialWrite.authorization = responseMessage.authorization;
}

Check warning on line 604 in packages/api/src/record.ts

View check run for this annotation

Codecov / codecov/patch

packages/api/src/record.ts#L602-L604

Added lines #L602 - L604 were not covered by tests
}

// Now that we've processed a potential initial write, we can process the current record state.
const requestOptions: ProcessDwnRequest = {
messageType : DwnInterfaceName.Records + DwnMethodName.Write,
rawMessage : this.rawMessage,
author : this._connectedDid,
target : this._connectedDid,
dataStream : await this.data.blob(),
signAsOwner : !this.initialWrite && signAsOwner, // we only need to sign this record if it is the initial write and is marked for signing
store,
};

const agentResponse = await this._agent.processDwnRequest(requestOptions);
const { message, reply: { status } } = agentResponse;
const responseMessage = message as RecordsWriteMessage;

if (200 <= status.code && status.code <= 299) {
// If we are signing as the owner, make sure to update the current record state's authorization, because now it will have the owner's signature on it.
if (signAsOwner) this._authorization = responseMessage.authorization;
}

return { status };
}

/**
* Fetches the record's data from the specified DWN.
*
Expand Down

0 comments on commit 4d1ada1

Please sign in to comment.