From 43fdbc900c5944bf8b0bc75d3a74bfdda2494350 Mon Sep 17 00:00:00 2001 From: Henry Tsai Date: Fri, 26 Apr 2024 11:01:10 -0700 Subject: [PATCH] Added `prune` feature (#732) 1st of a few PRs for the `prune` feature. This one lights up the capability for anyone who is able to perform `RecordsDelete`, which should be immediately useful to unblock app development. There will need to be 2 more follow up PRs: 1. Its own distinct action support in protocol rules. 2. Ephemeral state keeping to handle unexpected termination and resumption. --- .vscode/launch.json | 3 +- .../interface-methods/records-delete.json | 6 +- src/handlers/records-delete.ts | 5 + src/interfaces/records-delete.ts | 8 +- src/interfaces/records-write.ts | 2 +- src/store/storage-controller.ts | 81 ++++++- src/types/records-types.ts | 7 +- src/utils/records.ts | 15 +- tests/features/records-prune.spec.ts | 212 ++++++++++++++++++ tests/interfaces/records-write.spec.ts | 4 +- tests/test-suite.ts | 2 + 11 files changed, 334 insertions(+), 11 deletions(-) create mode 100644 tests/features/records-prune.spec.ts diff --git a/.vscode/launch.json b/.vscode/launch.json index c74f7d73d..c50d359f4 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,8 +5,7 @@ "version": "0.2.0", "configurations": [ { - "runtimeVersion": "18", - "type": "pwa-node", + "type": "node", "request": "launch", "name": "Tests - Node", "runtimeExecutable": "${workspaceRoot}/node_modules/.bin/mocha", diff --git a/json-schemas/interface-methods/records-delete.json b/json-schemas/interface-methods/records-delete.json index acf27fc35..b2515e0b4 100644 --- a/json-schemas/interface-methods/records-delete.json +++ b/json-schemas/interface-methods/records-delete.json @@ -18,7 +18,8 @@ "interface", "method", "messageTimestamp", - "recordId" + "recordId", + "prune" ], "properties": { "interface": { @@ -38,6 +39,9 @@ }, "recordId": { "type": "string" + }, + "prune": { + "type": "boolean" } } } diff --git a/src/handlers/records-delete.ts b/src/handlers/records-delete.ts index 811226c75..e6396a3be 100644 --- a/src/handlers/records-delete.ts +++ b/src/handlers/records-delete.ts @@ -100,6 +100,11 @@ export class RecordsDeleteHandler implements MethodHandler { this.eventStream.emit(tenant, { message, initialWrite }, indexes); } + if (message.descriptor.prune) { + // purge/hard-delete all descendent records + await StorageController.purgeRecordDescendants(tenant, message.descriptor.recordId, this.messageStore, this.dataStore, this.eventLog); + } + // delete all existing messages that are not newest, except for the initial write await StorageController.deleteAllOlderMessagesButKeepInitialWrite( tenant, existingMessages, newestMessage, this.messageStore, this.dataStore, this.eventLog diff --git a/src/interfaces/records-delete.ts b/src/interfaces/records-delete.ts index ad4714e2a..aaa433010 100644 --- a/src/interfaces/records-delete.ts +++ b/src/interfaces/records-delete.ts @@ -18,6 +18,11 @@ export type RecordsDeleteOptions = { protocolRole?: string; signer: Signer; + /** + * Denotes if all the descendent records should be purged. Defaults to `false`. + */ + prune?: boolean + /** * The delegated grant to sign on behalf of the logical author, which is the grantor (`grantedBy`) of the delegated grant. */ @@ -52,8 +57,9 @@ export class RecordsDelete extends AbstractMessage { const descriptor: RecordsDeleteDescriptor = { interface : DwnInterfaceName.Records, method : DwnMethodName.Delete, + messageTimestamp : options.messageTimestamp ?? currentTime, recordId, - messageTimestamp : options.messageTimestamp ?? currentTime + prune : options.prune ?? false }; const authorization = await Message.createAuthorization({ diff --git a/src/interfaces/records-write.ts b/src/interfaces/records-write.ts index 34e5bcf4c..09f2d7137 100644 --- a/src/interfaces/records-write.ts +++ b/src/interfaces/records-write.ts @@ -322,7 +322,7 @@ export class RecordsWrite implements MessageInterface { if ((options.data === undefined && options.dataCid === undefined) || (options.data !== undefined && options.dataCid !== undefined)) { - throw new DwnError(DwnErrorCode.RecordsWriteCreateDataAndDataCidMutuallyExclusive, 'one and only one parameter between `data` and `dataCid` is allowed'); + throw new DwnError(DwnErrorCode.RecordsWriteCreateDataAndDataCidMutuallyExclusive, 'one and only one parameter between `data` and `dataCid` is required'); } if ((options.dataCid === undefined && options.dataSize !== undefined) || diff --git a/src/store/storage-controller.ts b/src/store/storage-controller.ts index fe3512022..da2bffcc2 100644 --- a/src/store/storage-controller.ts +++ b/src/store/storage-controller.ts @@ -2,12 +2,13 @@ import type { DataStore } from '../types/data-store.js'; import type { EventLog } from '../types/event-log.js'; import type { GenericMessage } from '../types/message-types.js'; import type { MessageStore } from '../types/message-store.js'; -import type { RecordsQueryReplyEntry, RecordsWriteMessage } from '../types/records-types.js'; +import type { RecordsDeleteMessage, RecordsQueryReplyEntry, RecordsWriteMessage } from '../types/records-types.js'; import { DwnConstant } from '../core/dwn-constant.js'; -import { DwnMethodName } from '../enums/dwn-interface-method.js'; import { Message } from '../core/message.js'; +import { Records } from '../utils/records.js'; import { RecordsWrite } from '../interfaces/records-write.js'; +import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js'; /** * A class that provides an abstraction for the usage of MessageStore, DataStore, and EventLog. @@ -44,6 +45,82 @@ export class StorageController { await dataStore.delete(tenant, recordsWriteMessage.recordId, recordsWriteMessage.descriptor.dataCid); } + /** + * Purges (permanent hard-delete) all descendant's data of the given `recordId`. + */ + public static async purgeRecordDescendants( + tenant: string, + recordId: string, + messageStore: MessageStore, + dataStore: DataStore, + eventLog: EventLog + ): Promise { + const filter = { + interface : DwnInterfaceName.Records, + parentId : recordId + }; + const { messages: childMessages } = await messageStore.query(tenant, [filter]); + + // group the child messages by `recordId` + const recordIdToMessagesMap = new Map(); + for (const message of childMessages) { + // get the recordId + let recordId; + if (Records.isRecordsWrite(message)) { + recordId = message.recordId; + } else { + recordId = (message as RecordsDeleteMessage).descriptor.recordId; + } + + if (!recordIdToMessagesMap.has(recordId)) { + recordIdToMessagesMap.set(recordId, []); + } + recordIdToMessagesMap.get(recordId)!.push(message); + } + + // purge all child's descendants first + for (const childRecordId of recordIdToMessagesMap.keys()) { + // purge the child's descendent messages first + await StorageController.purgeRecordDescendants(tenant, childRecordId, messageStore, dataStore, eventLog); + } + + // then purge the child messages themselves + for (const childRecordId of recordIdToMessagesMap.keys()) { + await StorageController.purgeRecordMessages(tenant, recordIdToMessagesMap.get(childRecordId)!, messageStore, dataStore, eventLog); + } + } + + /** + * Purges (permanent hard-delete) all messages of the SAME `recordId` given and their associated data and events. + * Assumes that the given `recordMessages` are all of the same `recordId`. + */ + private static async purgeRecordMessages( + tenant: string, + recordMessages: GenericMessage[], + messageStore: MessageStore, + dataStore: DataStore, + eventLog: EventLog + ): Promise { + // delete the data from the data store first so no chance of orphaned data (not having a message referencing it) in case of server crash + // NOTE: only the `RecordsWrite` with latest timestamp can possibly have data associated with it so we do this filtering as an optimization + // NOTE: however there could still be no data associated with the `RecordsWrite` with newest timestamp, because either: + // 1. the data is encoded with the message itself; or + // 2. the newest `RecordsWrite` may not be the "true" latest state due to: + // a. sync has yet to write the latest `RecordsWrite`; or + // b. `recordMessages` maybe an incomplete page of results if the caller uses the paging in its query + // Calling dataStore.delete() is a no-op if the data is not found, so we are safe to call it redundantly. + const recordsWrites = recordMessages.filter((message) => message.descriptor.method === DwnMethodName.Write); + const newestRecordsWrite = (await Message.getNewestMessage(recordsWrites)) as RecordsWriteMessage; + await dataStore.delete(tenant, newestRecordsWrite.recordId, newestRecordsWrite.descriptor.dataCid); + + // then delete all events associated with the record messages before deleting the messages so we don't have orphaned events + const messageCids = await Promise.all(recordMessages.map((message) => Message.getCid(message))); + await eventLog.deleteEventsByCid(tenant, messageCids); + + // finally delete all record messages + await Promise.all(messageCids.map((messageCid) => messageStore.delete(tenant, messageCid))); + } + /** * Deletes all messages in `existingMessages` that are older than the `newestMessage` in the given tenant, * but keep the initial write write for future processing by ensuring its `isLatestBaseState` index is "false". diff --git a/src/types/records-types.ts b/src/types/records-types.ts index 9a490241f..340d60378 100644 --- a/src/types/records-types.ts +++ b/src/types/records-types.ts @@ -227,6 +227,11 @@ export type RecordsDeleteMessage = GenericMessage & { export type RecordsDeleteDescriptor = { interface: DwnInterfaceName.Records; method: DwnMethodName.Delete; - recordId: string; messageTimestamp: string; + recordId: string; + + /** + * Denotes if all the descendent records should be purged. + */ + prune: boolean }; \ No newline at end of file diff --git a/src/utils/records.ts b/src/utils/records.ts index b7d15c57e..0b3275349 100644 --- a/src/utils/records.ts +++ b/src/utils/records.ts @@ -1,7 +1,7 @@ import type { DerivedPrivateJwk } from './hd-key.js'; -import type { GenericSignaturePayload } from '../types/message-types.js'; import type { Readable } from 'readable-stream'; import type { Filter, KeyValues, StartsWithFilter } from '../types/query-types.js'; +import type { GenericMessage, GenericSignaturePayload } from '../types/message-types.js'; import type { RecordsDeleteMessage, RecordsFilter, RecordsQueryMessage, RecordsReadMessage, RecordsSubscribeMessage, RecordsWriteDescriptor, RecordsWriteMessage, RecordsWriteTags, RecordsWriteTagsFilter } from '../types/records-types.js'; import { DateSort } from '../types/records-types.js'; @@ -15,12 +15,25 @@ import { PermissionGrant } from '../protocols/permission-grant.js'; import { removeUndefinedProperties } from './object.js'; import { Secp256k1 } from './secp256k1.js'; import { DwnError, DwnErrorCode } from '../core/dwn-error.js'; +import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js'; import { normalizeProtocolUrl, normalizeSchemaUrl } from './url.js'; /** * Class containing useful utilities related to the Records interface. */ export class Records { + + /** + * Checks if the given message is a `RecordsWriteMessage`. + */ + public static isRecordsWrite(message: GenericMessage): message is RecordsWriteMessage { + const isRecordsWrite = + message.descriptor.interface === DwnInterfaceName.Records && + message.descriptor.method === DwnMethodName.Write; + + return isRecordsWrite; + } + /** * Decrypts the encrypted data in a message reply using the given ancestor private key. * @param ancestorPrivateKey Any ancestor private key in the key derivation path. diff --git a/tests/features/records-prune.spec.ts b/tests/features/records-prune.spec.ts new file mode 100644 index 000000000..9700798a8 --- /dev/null +++ b/tests/features/records-prune.spec.ts @@ -0,0 +1,212 @@ +import type { DidResolver } from '@web5/dids'; +import type { EventStream } from '../../src/types/subscriptions.js'; +import type { + DataStore, + EventLog, + MessageStore +} from '../../src/index.js'; + +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import chai, { expect } from 'chai'; + +import nestedProtocolDefinition from '../vectors/protocol-definitions/nested.json' assert { type: 'json' }; + +import { DwnInterfaceName } from '../../src/enums/dwn-interface-method.js'; +import { Message } from '../../src/core/message.js'; +import { TestDataGenerator } from '../utils/test-data-generator.js'; +import { TestEventStream } from '../test-event-stream.js'; +import { TestStores } from '../test-stores.js'; +import { DataStream, Dwn, DwnConstant, Jws, ProtocolsConfigure, RecordsDelete, RecordsQuery, RecordsWrite, SortDirection } from '../../src/index.js'; +import { DidKey, UniversalResolver } from '@web5/dids'; + +chai.use(chaiAsPromised); + +export function testRecordsPrune(): void { + describe('records pruning', () => { + let didResolver: DidResolver; + let messageStore: MessageStore; + let dataStore: DataStore; + let eventLog: EventLog; + let eventStream: EventStream; + let dwn: Dwn; + + // important to follow the `before` and `after` pattern to initialize and clean the stores in tests + // so that different test suites can reuse the same backend store for testing + before(async () => { + didResolver = new UniversalResolver({ didResolvers: [DidKey] }); + + const stores = TestStores.get(); + messageStore = stores.messageStore; + dataStore = stores.dataStore; + eventLog = stores.eventLog; + eventStream = TestEventStream.get(); + + dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog, eventStream }); + }); + + beforeEach(async () => { + sinon.restore(); // wipe all previous stubs/spies/mocks/fakes + + // clean up before each test rather than after so that a test does not depend on other tests to do the clean up + await messageStore.clear(); + await dataStore.clear(); + await eventLog.clear(); + }); + + after(async () => { + await dwn.close(); + }); + + it('should purge all descendants when given RecordsDelete with `prune` set to `true`', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + // create a protocol with foo <- bar <- baz structure + const nestedProtocol = nestedProtocolDefinition; + const protocolsConfig = await ProtocolsConfigure.create({ + definition : nestedProtocol, + signer : Jws.createSigner(alice) + }); + const protocolsConfigureReply = await dwn.processMessage(alice.did, protocolsConfig.message); + expect(protocolsConfigureReply.status.code).to.equal(202); + + // writes 2 foos, 2 bars under foo1, and 2 bazes under bar1 + + // write 2 foos + const fooData = TestDataGenerator.randomBytes(100); + const fooOptions = { + signer : Jws.createSigner(alice), + protocol : nestedProtocol.protocol, + protocolPath : 'foo', + schema : nestedProtocol.types.foo.schema, + dataFormat : nestedProtocol.types.foo.dataFormats[0], + data : fooData + }; + + const foo1 = await RecordsWrite.create(fooOptions); + const foo1WriteResponse = await dwn.processMessage(alice.did, foo1.message, { dataStream: DataStream.fromBytes(fooData) }); + expect(foo1WriteResponse.status.code).equals(202); + + const foo2 = await RecordsWrite.create(fooOptions); + const foo2WriteResponse = await dwn.processMessage(alice.did, foo2.message, { dataStream: DataStream.fromBytes(fooData) }); + expect(foo2WriteResponse.status.code).equals(202); + + // write 2 bars under foo1 with data large enough to be required to be stored in the data store so we can test purge in data store + const barData = TestDataGenerator.randomBytes(DwnConstant.maxDataSizeAllowedToBeEncoded + 1); + const barOptions = { + signer : Jws.createSigner(alice), + protocol : nestedProtocol.protocol, + protocolPath : 'foo/bar', + schema : nestedProtocol.types.bar.schema, + dataFormat : nestedProtocol.types.bar.dataFormats[0], + parentContextId : foo1.message.contextId, + data : barData + }; + + const bar1 = await RecordsWrite.create({ ...barOptions }); + const bar1WriteResponse = await dwn.processMessage(alice.did, bar1.message, { dataStream: DataStream.fromBytes(barData) }); + expect(bar1WriteResponse.status.code).equals(202); + + const bar2 = await RecordsWrite.create({ ...barOptions }); + const bar2WriteResponse = await dwn.processMessage(alice.did, bar2.message, { dataStream: DataStream.fromBytes(barData) }); + expect(bar2WriteResponse.status.code).equals(202); + + // write 2 bazes under bar1, each has more than 1 message associated with the record so we can test multi-message purge + const bazData = TestDataGenerator.randomBytes(100); + const bazOptions = { + signer : Jws.createSigner(alice), + protocol : nestedProtocol.protocol, + protocolPath : 'foo/bar/baz', + schema : nestedProtocol.types.baz.schema, + dataFormat : nestedProtocol.types.baz.dataFormats[0], + parentContextId : bar1.message.contextId, + data : bazData + }; + + const baz1 = await RecordsWrite.create({ ...bazOptions }); + const baz1WriteResponse = await dwn.processMessage(alice.did, baz1.message, { dataStream: DataStream.fromBytes(bazData) }); + expect(baz1WriteResponse.status.code).equals(202); + + const baz2 = await RecordsWrite.create({ ...bazOptions }); + const baz2WriteResponse = await dwn.processMessage(alice.did, baz2.message, { dataStream: DataStream.fromBytes(bazData) }); + expect(baz2WriteResponse.status.code).equals(202); + + // make latest state of baz1 a `RecordsWrite` + const newBaz1Data = TestDataGenerator.randomBytes(100); + const baz1Update = await RecordsWrite.createFrom({ + signer : Jws.createSigner(alice), + recordsWriteMessage : baz1.message, + data : newBaz1Data + }); + const baz1UpdateResponse = await dwn.processMessage(alice.did, baz1Update.message, { dataStream: DataStream.fromBytes(newBaz1Data) }); + expect(baz1UpdateResponse.status.code).equals(202); + + // make latest state of baz2 a `RecordsDelete` + const baz2Delete = await RecordsDelete.create({ + signer : Jws.createSigner(alice), + recordId : baz2.message.recordId + }); + const baz2DeleteResponse = await dwn.processMessage(alice.did, baz2Delete.message); + expect(baz2DeleteResponse.status.code).equals(202); + + // sanity test messages are inserted in message store + const queryFilter = [{ + interface : DwnInterfaceName.Records, + protocol : nestedProtocol.protocol + }]; + const queryResult = await messageStore.query(alice.did, queryFilter); + expect(queryResult.messages.length).to.equal(8); // 2 foos, 2 bars, 2 bazes x 2 messages each + + // sanity test events are inserted in event log + const { events } = await eventLog.queryEvents(alice.did, queryFilter); + expect(events.length).to.equal(8); + + // sanity test data is inserted in data store + const bar1DataGetResult = await dataStore.get(alice.did, bar1.message.recordId, bar1.message.descriptor.dataCid); + const bar2DataGetResult = await dataStore.get(alice.did, bar2.message.recordId, bar2.message.descriptor.dataCid); + expect(bar1DataGetResult).to.not.be.undefined; + expect(bar2DataGetResult).to.not.be.undefined; + + // Delete foo1 with prune enabled + const foo1Delete = await RecordsDelete.create({ + recordId : foo1.message.recordId, + prune : true, + signer : Jws.createSigner(alice) + }); + + const deleteReply = await dwn.processMessage(alice.did, foo1Delete.message); + expect(deleteReply.status.code).to.equal(202); + + // verify all bar and baz message are permanently deleted + const queryResult2 = await messageStore.query(alice.did, queryFilter, { messageTimestamp: SortDirection.Ascending }); + expect(queryResult2.messages.length).to.equal(3); // foo2 RecordsWrite, foo1 RecordsWrite and RecordsDelete + expect(queryResult2.messages[0]).to.deep.include(foo1.message); + expect(queryResult2.messages[1]).to.deep.include(foo2.message); + expect(queryResult2.messages[2]).to.deep.include(foo1Delete.message); + + // verify all bar and baz events are permanently deleted + const { events: events2 } = await eventLog.queryEvents(alice.did, queryFilter); + expect(events2.length).to.equal(3); + const foo1RecordsWriteCid = await Message.getCid(foo1.message); + const foo2RecordsWriteCid = await Message.getCid(foo2.message); + const foo2RecordsDeleteCid = await Message.getCid(foo1Delete.message); + expect(events2).to.contain.members([foo1RecordsWriteCid, foo2RecordsWriteCid, foo2RecordsDeleteCid]); + + // verify all bar data are permanently deleted + const bar1DataGetResult2 = await dataStore.get(alice.did, bar1.message.recordId, bar1.message.descriptor.dataCid); + const bar2DataGetResult2 = await dataStore.get(alice.did, bar2.message.recordId, bar2.message.descriptor.dataCid); + expect(bar1DataGetResult2).to.be.undefined; + expect(bar2DataGetResult2).to.be.undefined; + + // sanity test an external query will no longer return the deleted records + const queryData = await RecordsQuery.create({ + signer : Jws.createSigner(alice), + filter : { protocol: nestedProtocol.protocol } + }); + const reply2 = await dwn.processMessage(alice.did, queryData.message); + expect(reply2.status.code).to.equal(200); + expect(reply2.entries?.length).to.equal(1); // only foo2 is left + expect(reply2.entries![0]).to.deep.include(foo2.message); + }); + }); +} \ No newline at end of file diff --git a/tests/interfaces/records-write.spec.ts b/tests/interfaces/records-write.spec.ts index 4fe373778..354378d5d 100644 --- a/tests/interfaces/records-write.spec.ts +++ b/tests/interfaces/records-write.spec.ts @@ -77,7 +77,7 @@ describe('RecordsWrite', () => { }; const createPromise1 = RecordsWrite.create(options1); - await expect(createPromise1).to.be.rejectedWith('one and only one parameter between `data` and `dataCid` is allowed'); + await expect(createPromise1).to.be.rejectedWith(DwnErrorCode.RecordsWriteCreateDataAndDataCidMutuallyExclusive); // testing `data` and `dataCid` both undefined const options2 = { @@ -92,7 +92,7 @@ describe('RecordsWrite', () => { }; const createPromise2 = RecordsWrite.create(options2); - await expect(createPromise2).to.be.rejectedWith('one and only one parameter between `data` and `dataCid` is allowed'); + await expect(createPromise2).to.be.rejectedWith(DwnErrorCode.RecordsWriteCreateDataAndDataCidMutuallyExclusive); }); it('should required `dataCid` and `dataSize` to be both defined or undefined at the same time', async () => { diff --git a/tests/test-suite.ts b/tests/test-suite.ts index dd8926237..3d2ac74b1 100644 --- a/tests/test-suite.ts +++ b/tests/test-suite.ts @@ -20,6 +20,7 @@ import { testProtocolsConfigureHandler } from './handlers/protocols-configure.sp import { testProtocolsQueryHandler } from './handlers/protocols-query.spec.js'; import { testProtocolUpdateAction } from './features/protocol-update-action.spec.js'; import { testRecordsDeleteHandler } from './handlers/records-delete.spec.js'; +import { testRecordsPrune } from './features/records-prune.spec.js'; import { testRecordsQueryHandler } from './handlers/records-query.spec.js'; import { testRecordsReadHandler } from './handlers/records-read.spec.js'; import { testRecordsSubscribeHandler } from './handlers/records-subscribe.spec.js'; @@ -68,6 +69,7 @@ export class TestSuite { testProtocolCreateAction(); testProtocolDeleteAction(); testProtocolUpdateAction(); + testRecordsPrune(); testRecordsTags(); // scenario tests