Skip to content

Commit

Permalink
Added prune feature (#732)
Browse files Browse the repository at this point in the history
1st of a few PRs for the `prune` feature. This one lights up the
capability for anyone who is able to perform `RecordsDelete`, which
should be immediately useful to unblock app development.

There will need to be 2 more follow up PRs:

1. Its own distinct action support in protocol rules.
2. Ephemeral state keeping to handle unexpected termination and
resumption.
  • Loading branch information
thehenrytsai authored Apr 26, 2024
1 parent a23bbb3 commit 43fdbc9
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 11 deletions.
3 changes: 1 addition & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
"version": "0.2.0",
"configurations": [
{
"runtimeVersion": "18",
"type": "pwa-node",
"type": "node",
"request": "launch",
"name": "Tests - Node",
"runtimeExecutable": "${workspaceRoot}/node_modules/.bin/mocha",
Expand Down
6 changes: 5 additions & 1 deletion json-schemas/interface-methods/records-delete.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"interface",
"method",
"messageTimestamp",
"recordId"
"recordId",
"prune"
],
"properties": {
"interface": {
Expand All @@ -38,6 +39,9 @@
},
"recordId": {
"type": "string"
},
"prune": {
"type": "boolean"
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/handlers/records-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ export class RecordsDeleteHandler implements MethodHandler {
this.eventStream.emit(tenant, { message, initialWrite }, indexes);
}

if (message.descriptor.prune) {
// purge/hard-delete all descendent records
await StorageController.purgeRecordDescendants(tenant, message.descriptor.recordId, this.messageStore, this.dataStore, this.eventLog);
}

// delete all existing messages that are not newest, except for the initial write
await StorageController.deleteAllOlderMessagesButKeepInitialWrite(
tenant, existingMessages, newestMessage, this.messageStore, this.dataStore, this.eventLog
Expand Down
8 changes: 7 additions & 1 deletion src/interfaces/records-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ export type RecordsDeleteOptions = {
protocolRole?: string;
signer: Signer;

/**
* Denotes if all the descendent records should be purged. Defaults to `false`.
*/
prune?: boolean

/**
* The delegated grant to sign on behalf of the logical author, which is the grantor (`grantedBy`) of the delegated grant.
*/
Expand Down Expand Up @@ -52,8 +57,9 @@ export class RecordsDelete extends AbstractMessage<RecordsDeleteMessage> {
const descriptor: RecordsDeleteDescriptor = {
interface : DwnInterfaceName.Records,
method : DwnMethodName.Delete,
messageTimestamp : options.messageTimestamp ?? currentTime,
recordId,
messageTimestamp : options.messageTimestamp ?? currentTime
prune : options.prune ?? false
};

const authorization = await Message.createAuthorization({
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/records-write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ export class RecordsWrite implements MessageInterface<RecordsWriteMessage> {

if ((options.data === undefined && options.dataCid === undefined) ||
(options.data !== undefined && options.dataCid !== undefined)) {
throw new DwnError(DwnErrorCode.RecordsWriteCreateDataAndDataCidMutuallyExclusive, 'one and only one parameter between `data` and `dataCid` is allowed');
throw new DwnError(DwnErrorCode.RecordsWriteCreateDataAndDataCidMutuallyExclusive, 'one and only one parameter between `data` and `dataCid` is required');
}

if ((options.dataCid === undefined && options.dataSize !== undefined) ||
Expand Down
81 changes: 79 additions & 2 deletions src/store/storage-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ import type { DataStore } from '../types/data-store.js';
import type { EventLog } from '../types/event-log.js';
import type { GenericMessage } from '../types/message-types.js';
import type { MessageStore } from '../types/message-store.js';
import type { RecordsQueryReplyEntry, RecordsWriteMessage } from '../types/records-types.js';
import type { RecordsDeleteMessage, RecordsQueryReplyEntry, RecordsWriteMessage } from '../types/records-types.js';

import { DwnConstant } from '../core/dwn-constant.js';
import { DwnMethodName } from '../enums/dwn-interface-method.js';
import { Message } from '../core/message.js';
import { Records } from '../utils/records.js';
import { RecordsWrite } from '../interfaces/records-write.js';
import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';

/**
* A class that provides an abstraction for the usage of MessageStore, DataStore, and EventLog.
Expand Down Expand Up @@ -44,6 +45,82 @@ export class StorageController {
await dataStore.delete(tenant, recordsWriteMessage.recordId, recordsWriteMessage.descriptor.dataCid);
}

/**
* Purges (permanent hard-delete) all descendant's data of the given `recordId`.
*/
public static async purgeRecordDescendants(
tenant: string,
recordId: string,
messageStore: MessageStore,
dataStore: DataStore,
eventLog: EventLog
): Promise<void> {
const filter = {
interface : DwnInterfaceName.Records,
parentId : recordId
};
const { messages: childMessages } = await messageStore.query(tenant, [filter]);

// group the child messages by `recordId`
const recordIdToMessagesMap = new Map<string, GenericMessage[]>();
for (const message of childMessages) {
// get the recordId
let recordId;
if (Records.isRecordsWrite(message)) {
recordId = message.recordId;
} else {
recordId = (message as RecordsDeleteMessage).descriptor.recordId;
}

if (!recordIdToMessagesMap.has(recordId)) {
recordIdToMessagesMap.set(recordId, []);
}
recordIdToMessagesMap.get(recordId)!.push(message);
}

// purge all child's descendants first
for (const childRecordId of recordIdToMessagesMap.keys()) {
// purge the child's descendent messages first
await StorageController.purgeRecordDescendants(tenant, childRecordId, messageStore, dataStore, eventLog);
}

// then purge the child messages themselves
for (const childRecordId of recordIdToMessagesMap.keys()) {
await StorageController.purgeRecordMessages(tenant, recordIdToMessagesMap.get(childRecordId)!, messageStore, dataStore, eventLog);
}
}

/**
* Purges (permanent hard-delete) all messages of the SAME `recordId` given and their associated data and events.
* Assumes that the given `recordMessages` are all of the same `recordId`.
*/
private static async purgeRecordMessages(
tenant: string,
recordMessages: GenericMessage[],
messageStore: MessageStore,
dataStore: DataStore,
eventLog: EventLog
): Promise<void> {
// delete the data from the data store first so no chance of orphaned data (not having a message referencing it) in case of server crash
// NOTE: only the `RecordsWrite` with latest timestamp can possibly have data associated with it so we do this filtering as an optimization
// NOTE: however there could still be no data associated with the `RecordsWrite` with newest timestamp, because either:
// 1. the data is encoded with the message itself; or
// 2. the newest `RecordsWrite` may not be the "true" latest state due to:
// a. sync has yet to write the latest `RecordsWrite`; or
// b. `recordMessages` maybe an incomplete page of results if the caller uses the paging in its query
// Calling dataStore.delete() is a no-op if the data is not found, so we are safe to call it redundantly.
const recordsWrites = recordMessages.filter((message) => message.descriptor.method === DwnMethodName.Write);
const newestRecordsWrite = (await Message.getNewestMessage(recordsWrites)) as RecordsWriteMessage;
await dataStore.delete(tenant, newestRecordsWrite.recordId, newestRecordsWrite.descriptor.dataCid);

// then delete all events associated with the record messages before deleting the messages so we don't have orphaned events
const messageCids = await Promise.all(recordMessages.map((message) => Message.getCid(message)));
await eventLog.deleteEventsByCid(tenant, messageCids);

// finally delete all record messages
await Promise.all(messageCids.map((messageCid) => messageStore.delete(tenant, messageCid)));
}

/**
* Deletes all messages in `existingMessages` that are older than the `newestMessage` in the given tenant,
* but keep the initial write write for future processing by ensuring its `isLatestBaseState` index is "false".
Expand Down
7 changes: 6 additions & 1 deletion src/types/records-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ export type RecordsDeleteMessage = GenericMessage & {
export type RecordsDeleteDescriptor = {
interface: DwnInterfaceName.Records;
method: DwnMethodName.Delete;
recordId: string;
messageTimestamp: string;
recordId: string;

/**
* Denotes if all the descendent records should be purged.
*/
prune: boolean
};
15 changes: 14 additions & 1 deletion src/utils/records.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { DerivedPrivateJwk } from './hd-key.js';
import type { GenericSignaturePayload } from '../types/message-types.js';
import type { Readable } from 'readable-stream';
import type { Filter, KeyValues, StartsWithFilter } from '../types/query-types.js';
import type { GenericMessage, GenericSignaturePayload } from '../types/message-types.js';
import type { RecordsDeleteMessage, RecordsFilter, RecordsQueryMessage, RecordsReadMessage, RecordsSubscribeMessage, RecordsWriteDescriptor, RecordsWriteMessage, RecordsWriteTags, RecordsWriteTagsFilter } from '../types/records-types.js';

import { DateSort } from '../types/records-types.js';
Expand All @@ -15,12 +15,25 @@ import { PermissionGrant } from '../protocols/permission-grant.js';
import { removeUndefinedProperties } from './object.js';
import { Secp256k1 } from './secp256k1.js';
import { DwnError, DwnErrorCode } from '../core/dwn-error.js';
import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';
import { normalizeProtocolUrl, normalizeSchemaUrl } from './url.js';

/**
* Class containing useful utilities related to the Records interface.
*/
export class Records {

/**
* Checks if the given message is a `RecordsWriteMessage`.
*/
public static isRecordsWrite(message: GenericMessage): message is RecordsWriteMessage {
const isRecordsWrite =
message.descriptor.interface === DwnInterfaceName.Records &&
message.descriptor.method === DwnMethodName.Write;

return isRecordsWrite;
}

/**
* Decrypts the encrypted data in a message reply using the given ancestor private key.
* @param ancestorPrivateKey Any ancestor private key in the key derivation path.
Expand Down
Loading

0 comments on commit 43fdbc9

Please sign in to comment.