Skip to content

Commit

Permalink
Allow initial RecordsWrite without data to be written to the DWN. (#629)
Browse files Browse the repository at this point in the history
* allow writing initial records write without data

* update comments

* refactor write without data flow

* remove unused DwnError

* add anther tests for data size difference

* remove validation code that was only valid for the removed synchronizePrunedInitialRecordsWrite method

* modify comment and bring public method up in the file

* update test comments

* Refactoring + docoumentation (#635)

---------

Co-authored-by: Henry Tsai <[email protected]>
  • Loading branch information
LiranCohen and thehenrytsai authored Dec 1, 2023
1 parent 641888e commit 9c0fdb5
Show file tree
Hide file tree
Showing 6 changed files with 496 additions and 302 deletions.
2 changes: 1 addition & 1 deletion src/core/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ export enum DwnErrorCode {
RecordsWriteGetEntryIdUndefinedAuthor = 'RecordsWriteGetEntryIdUndefinedAuthor',
RecordsWriteGetInitialWriteNotFound = 'RecordsWriteGetInitialWriteNotFound',
RecordsWriteImmutablePropertyChanged = 'RecordsWriteImmutablePropertyChanged',
RecordsWriteMissingAuthorizationSigner = 'RecordsWriteMissingAuthorizationSigner',
RecordsWriteMissingSigner = 'RecordsWriteMissingSigner',
RecordsWriteMissingDataInPrevious = 'RecordsWriteMissingDataInPrevious',
RecordsWriteMissingEncodedDataInPrevious = 'RecordsWriteMissingEncodedDataInPrevious',
RecordsWriteMissingDataAssociation = 'RecordsWriteMissingDataAssociation',
RecordsWriteMissingDataStream = 'RecordsWriteMissingDataStream',
RecordsWriteMissingProtocol = 'RecordsWriteMissingProtocol',
Expand Down
34 changes: 0 additions & 34 deletions src/dwn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import type { GenericMessage } from './types/message-types.js';
import type { MessageStore } from './types/message-store.js';
import type { MethodHandler } from './types/method-handler.js';
import type { Readable } from 'readable-stream';
import type { RecordsWriteHandlerOptions } from './handlers/records-write.js';
import type { TenantGate } from './core/tenant-gate.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply } from './types/event-types.js';
import type { GenericMessageReply, UnionMessageReply } from './core/message-reply.js';
Expand Down Expand Up @@ -125,26 +124,6 @@ export class Dwn {
return methodHandlerReply;
}

/**
* Privileged method for writing a pruned initial `RecordsWrite` to a DWN without needing to supply associated data.
*/
public async synchronizePrunedInitialRecordsWrite(tenant: string, message: RecordsWriteMessage): Promise<GenericMessageReply> {
const errorMessageReply =
await this.validateTenant(tenant) ??
await this.validateMessageIntegrity(message, DwnInterfaceName.Records, DwnMethodName.Write);
if (errorMessageReply !== undefined) {
return errorMessageReply;
}

const options: RecordsWriteHandlerOptions = {
skipDataStorage: true,
};

const handler = new RecordsWriteHandler(this.didResolver, this.messageStore, this.dataStore, this.eventLog);
const methodHandlerReply = await handler.handle({ tenant, message, options });
return methodHandlerReply;
}

/**
* Checks tenant gate to see if tenant is allowed.
* @param tenant The tenant DID to route the given message to.
Expand All @@ -169,8 +148,6 @@ export class Dwn {
*/
public async validateMessageIntegrity(
rawMessage: any,
expectedInterface?: DwnInterfaceName,
expectedMethod?: DwnMethodName,
): Promise<GenericMessageReply | undefined> {
// Verify interface and method
const dwnInterface = rawMessage?.descriptor?.interface;
Expand All @@ -181,17 +158,6 @@ export class Dwn {
};
}

if (expectedInterface !== undefined && expectedInterface !== dwnInterface) {
return {
status: { code: 400, detail: `Expected interface ${expectedInterface}, received ${dwnInterface}` }
};
}
if (expectedMethod !== undefined && expectedMethod !== dwnMethod) {
return {
status: { code: 400, detail: `Expected method ${expectedInterface}${expectedMethod}, received ${dwnInterface}${dwnMethod}` }
};
}

// validate message structure
try {
// consider to push this down to individual handlers
Expand Down
215 changes: 122 additions & 93 deletions src/handlers/records-write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { EventLog } from '../types/event-log.js';
import type { GenericMessageReply } from '../core/message-reply.js';
import type { MessageStore } from '../types//message-store.js';
import type { MethodHandler } from '../types/method-handler.js';
import type { RecordsDeleteMessage, RecordsWriteMessage, RecordsWriteMessageWithOptionalEncodedData } from '../types/records-types.js';
import type { RecordsWriteMessage, RecordsWriteMessageWithOptionalEncodedData } from '../types/records-types.js';

import { authenticate } from '../core/auth.js';
import { Cid } from '../utils/cid.js';
Expand All @@ -21,11 +21,7 @@ import { StorageController } from '../store/storage-controller.js';
import { DwnError, DwnErrorCode } from '../core/dwn-error.js';
import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';

export type RecordsWriteHandlerOptions = {
skipDataStorage?: boolean; // used for DWN sync
};

type HandlerArgs = { tenant: string, message: RecordsWriteMessage, options?: RecordsWriteHandlerOptions, dataStream?: _Readable.Readable};
type HandlerArgs = { tenant: string, message: RecordsWriteMessage, dataStream?: _Readable.Readable};

export class RecordsWriteHandler implements MethodHandler {

Expand All @@ -34,7 +30,6 @@ export class RecordsWriteHandler implements MethodHandler {
public async handle({
tenant,
message,
options,
dataStream
}: HandlerArgs): Promise<GenericMessageReply> {
let recordsWrite: RecordsWrite;
Expand Down Expand Up @@ -92,46 +87,56 @@ export class RecordsWriteHandler implements MethodHandler {
};
}

const isLatestBaseState = true;
const indexes = await recordsWrite.constructRecordsWriteIndexes(isLatestBaseState);

// if data is below a certain threshold, we embed the data directly into the message for storage in MessageStore.
let messageWithOptionalEncodedData: RecordsWriteMessageWithOptionalEncodedData = message;

// try to store data, unless options explicitly say to skip storage
if (options === undefined || !options.skipDataStorage) {
if (dataStream === undefined && newestExistingMessage?.descriptor.method === DwnMethodName.Delete) {
return messageReplyFromError(new DwnError(DwnErrorCode.RecordsWriteMissingDataStream, 'No data stream was provided with the previous message being a delete'), 400);
}
try {
// NOTE: We allow isLatestBaseState to be true ONLY if the incoming message comes with data, or if the incoming message is NOT an initial write
// This would allow an initial write to be written to the DB without data, but having it not queryable,
// because query implementation filters on `isLatestBaseState` being `true`
// thus preventing a user's attempt to gain authorized access to data by referencing the dataCid of a private data in their initial writes,
// See: https://github.com/TBD54566975/dwn-sdk-js/issues/359 for more info
let isLatestBaseState = false;
let messageWithOptionalEncodedData = message as RecordsWriteMessageWithOptionalEncodedData;

if (dataStream !== undefined) {
messageWithOptionalEncodedData = await this.processMessageWithDataStream(tenant, message, dataStream);
isLatestBaseState = true;
} else {
// else data stream is NOT provided

try {
// if data is below the threshold, we store it within MessageStore
if (message.descriptor.dataSize <= DwnConstant.maxDataSizeAllowedToBeEncoded) {
// processes and sets `encodedData` with appropriate data.
messageWithOptionalEncodedData = await this.processEncodedData(
message,
dataStream,
newestExistingMessage as (RecordsWriteMessage|RecordsDeleteMessage) | undefined
if (newestExistingMessage?.descriptor.method === DwnMethodName.Delete) {
throw new DwnError(
DwnErrorCode.RecordsWriteMissingDataStream,
'No data stream was provided with the previous message being a delete'
);
} else {
await this.putData(tenant, message, dataStream);
}
} catch (error) {
const e = error as any;
if (e.code === DwnErrorCode.RecordsWriteMissingDataInPrevious ||
e.code === DwnErrorCode.RecordsWriteMissingDataAssociation ||
e.code === DwnErrorCode.RecordsWriteDataCidMismatch ||
e.code === DwnErrorCode.RecordsWriteDataSizeMismatch) {
return messageReplyFromError(error, 400);

// at this point we know that newestExistingMessage exists is not a Delete

// if the incoming message is not an initial write, and no dataStream is provided, we would allow it provided it passes validation
// processMessageWithoutDataStream() abstracts that logic
if (!newMessageIsInitialWrite) {
const newestExistingWrite = newestExistingMessage as RecordsWriteMessageWithOptionalEncodedData;
messageWithOptionalEncodedData = await this.processMessageWithoutDataStream(tenant, message, newestExistingWrite );
isLatestBaseState = true;
}
}

// else throw
throw error;
const indexes = await recordsWrite.constructRecordsWriteIndexes(isLatestBaseState);
await this.messageStore.put(tenant, messageWithOptionalEncodedData, indexes);
await this.eventLog.append(tenant, await Message.getCid(message), indexes);
} catch (error) {
const e = error as any;
if (e.code === DwnErrorCode.RecordsWriteMissingEncodedDataInPrevious ||
e.code === DwnErrorCode.RecordsWriteMissingDataInPrevious ||
e.code === DwnErrorCode.RecordsWriteMissingDataStream ||
e.code === DwnErrorCode.RecordsWriteMissingDataAssociation ||
e.code === DwnErrorCode.RecordsWriteDataCidMismatch ||
e.code === DwnErrorCode.RecordsWriteDataSizeMismatch) {
return messageReplyFromError(error, 400);
}
}

await this.messageStore.put(tenant, messageWithOptionalEncodedData, indexes);
await this.eventLog.append(tenant, await Message.getCid(message), indexes);
// else throw
throw error;
}

const messageReply = {
status: { code: 202, detail: 'Accepted' }
Expand All @@ -146,77 +151,101 @@ export class RecordsWriteHandler implements MethodHandler {
};

/**
* Embeds the record's data into the `encodedData` property.
* If dataStream is present, it uses the dataStream. Otherwise, uses the `encodedData` from the most recent RecordsWrite.
*
* @returns {RecordsWriteMessageWithOptionalEncodedData} `encodedData` embedded.
*
* @throws {DwnError} with `DwnErrorCode.RecordsWriteMissingDataInPrevious`
* if `dataStream` is absent AND `encodedData` of previous message is missing
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataCidMismatch`
* if the data stream resulted in a data CID that mismatches with `dataCid` in the given message
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataSizeMismatch`
* if `dataSize` in `descriptor` given mismatches the actual data size
* Returns a `RecordsWriteMessageWithOptionalEncodedData` with a copy of the incoming message and the incoming data encoded to `Base64URL`.
*/
public async processEncodedData(
public async cloneAndAddEncodedData(message: RecordsWriteMessage, dataBytes: Uint8Array):Promise<RecordsWriteMessageWithOptionalEncodedData> {
const recordsWrite: RecordsWriteMessageWithOptionalEncodedData = { ...message };
recordsWrite.encodedData = Encoder.bytesToBase64Url(dataBytes);
return recordsWrite;
}

private async processMessageWithDataStream(
tenant: string,
message: RecordsWriteMessage,
dataStream: _Readable.Readable,
):Promise<RecordsWriteMessageWithOptionalEncodedData> {
let messageWithOptionalEncodedData: RecordsWriteMessageWithOptionalEncodedData = message;

// if data is below the threshold, we store it within MessageStore
if (message.descriptor.dataSize <= DwnConstant.maxDataSizeAllowedToBeEncoded) {
const dataBytes = await DataStream.toBytes(dataStream!);
const dataCid = await Cid.computeDagPbCidFromBytes(dataBytes);
// validate data integrity before setting.
RecordsWriteHandler.validateDataIntegrity(message.descriptor.dataCid, message.descriptor.dataSize, dataCid, dataBytes.length);
messageWithOptionalEncodedData = await this.cloneAndAddEncodedData(message, dataBytes);
} else {
const messageCid = await Message.getCid(message);
const result = await this.dataStore.put(tenant, messageCid, message.descriptor.dataCid, dataStream);
await this.validateDataStoreIntegrity(tenant, message, result.dataCid, result.dataSize);
}

return messageWithOptionalEncodedData;
}

private async processMessageWithoutDataStream(
tenant: string,
message: RecordsWriteMessage,
dataStream?: _Readable.Readable,
newestExistingMessage?: RecordsWriteMessage | RecordsDeleteMessage
): Promise<RecordsWriteMessageWithOptionalEncodedData> {
let dataBytes;
if (dataStream === undefined) {
const newestWithData = newestExistingMessage as RecordsWriteMessageWithOptionalEncodedData | undefined;
if (newestWithData?.encodedData === undefined) {
newestExistingWrite: RecordsWriteMessageWithOptionalEncodedData,
):Promise<RecordsWriteMessageWithOptionalEncodedData> {
const messageWithOptionalEncodedData: RecordsWriteMessageWithOptionalEncodedData = { ...message }; // clone
const { dataCid, dataSize } = message.descriptor;

// Since incoming message is not an initial write, and no dataStream is provided, we first check integrity against newest existing write.
// we preform the dataCid check in case a user attempts to gain access to data by referencing a different known dataCid,
// so we insure that the data is already associated with the existing newest message
// See: https://github.com/TBD54566975/dwn-sdk-js/issues/359 for more info
RecordsWriteHandler.validateDataIntegrity(dataCid, dataSize, newestExistingWrite.descriptor.dataCid, newestExistingWrite.descriptor.dataSize);

if (dataSize <= DwnConstant.maxDataSizeAllowedToBeEncoded) {
// we encode the data from the original write if it is smaller than the data-store threshold
if (newestExistingWrite.encodedData !== undefined) {
messageWithOptionalEncodedData.encodedData = newestExistingWrite.encodedData;
} else {
throw new DwnError(
DwnErrorCode.RecordsWriteMissingDataInPrevious,
DwnErrorCode.RecordsWriteMissingEncodedDataInPrevious,
`No dataStream was provided and unable to get data from previous message`
);
} else {
dataBytes = Encoder.base64UrlToBytes(newestWithData.encodedData);
}
} else {
dataBytes = await DataStream.toBytes(dataStream);
}
// attempt to retrieve the data from the previous message
const previousWriteMessageCid = await Message.getCid(newestExistingWrite);
const dataResults = await this.dataStore.get(tenant, previousWriteMessageCid, message.descriptor.dataCid);

const dataCid = await Cid.computeDagPbCidFromBytes(dataBytes);
RecordsWriteHandler.validateDataIntegrity(message.descriptor.dataCid, message.descriptor.dataSize, dataCid, dataBytes.length);
// if it does not exist we have no previous data to associate.
if (dataResults === undefined) {
throw new DwnError(
DwnErrorCode.RecordsWriteMissingDataInPrevious,
`No dataStream was provided and unable to get data from previous message`
);
}

const recordsWrite: RecordsWriteMessageWithOptionalEncodedData = { ...message };
recordsWrite.encodedData = Encoder.bytesToBase64Url(dataBytes);
return recordsWrite;
const result = await this.dataStore.associate(tenant, await Message.getCid(message), message.descriptor.dataCid);
if (result === undefined) {
throw new DwnError(
DwnErrorCode.RecordsWriteMissingDataAssociation,
'No dataStream was provided and unable to associate with previous data'
);
}
await this.validateDataStoreIntegrity(tenant, message, result.dataCid, result.dataSize);
}

return messageWithOptionalEncodedData;
}

/**
* Puts the given data in storage unless tenant already has that data for the given recordId
*
* @throws {DwnError} with `DwnErrorCode.RecordsWriteMissingDataAssociation`
* if `dataStream` is absent AND unable to associate data given `dataCid`
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataCidMismatch`
* if the data stream resulted in a data CID that mismatches with `dataCid` in the given message
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataSizeMismatch`
* if `dataSize` in `descriptor` given mismatches the actual data size
* Validates the data integrity after either putting the data or associating it with a new message.
* Upon failure deletes the association, and subsequently the data if there are no other associations.
*/
public async putData(
private async validateDataStoreIntegrity(
tenant: string,
message: RecordsWriteMessage,
dataStream?: _Readable.Readable,
dataCid: string,
dataSize: number
): Promise<void> {
let result: { dataCid: string, dataSize: number };
const messageCid = await Message.getCid(message);

if (dataStream === undefined) {
const associateResult = await this.dataStore.associate(tenant, messageCid, message.descriptor.dataCid);
if (associateResult === undefined) {
throw new DwnError(DwnErrorCode.RecordsWriteMissingDataAssociation, `Unable to associate dataCid ${message.descriptor.dataCid} ` +
`to messageCid ${messageCid} because dataStream was not provided and data was not found in dataStore`);
}
result = associateResult;
} else {
result = await this.dataStore.put(tenant, messageCid, message.descriptor.dataCid, dataStream);
}

try {
RecordsWriteHandler.validateDataIntegrity(message.descriptor.dataCid, message.descriptor.dataSize, result.dataCid, result.dataSize);
RecordsWriteHandler.validateDataIntegrity(message.descriptor.dataCid, message.descriptor.dataSize, dataCid, dataSize);
} catch (error) {
// delete data and throw error to caller
await this.dataStore.delete(tenant, messageCid, message.descriptor.dataCid);
Expand All @@ -232,7 +261,7 @@ export class RecordsWriteHandler implements MethodHandler {
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataSizeMismatch`
* if `dataSize` in `descriptor` given mismatches the actual data size
*/
static validateDataIntegrity(
private static validateDataIntegrity(
expectedDataCid: string,
expectedDataSize: number,
actualDataCid: string,
Expand Down
Loading

0 comments on commit 9c0fdb5

Please sign in to comment.