From 8f6e01e77c91d8932136b7a0666d31b83676e76b Mon Sep 17 00:00:00 2001 From: LiranCohen Date: Wed, 24 Jan 2024 18:48:13 -0500 Subject: [PATCH] RecordsSubscribe (#667) This PR builds on top of the existing `EventStream` and adds a subscription specifically for the `Records` interface which follows similar rules as those for `RecordsQuery`. Currently the subscription does not evict or re-authorize subscriptions after the initial processing of the message. So if a revocation of permissions happens after the subscription is live, the user will continue getting notified until they close the connection. This will be addressed in a separate PR to keep things more concise and easier to follow. https://github.com/TBD54566975/dwn-sdk-js/issues/668 --- Q_AND_A.md | 10 +- build/compile-validators.js | 2 + .../interface-methods/protocol-rule-set.json | 1 + .../interface-methods/records-subscribe.json | 44 ++ .../permissions/permissions-definitions.json | 3 + json-schemas/permissions/scopes.json | 18 + src/core/dwn-error.ts | 4 +- src/core/protocol-authorization.ts | 23 +- src/core/records-grant-authorization.ts | 24 +- src/dwn.ts | 22 +- src/handlers/events-subscribe.ts | 2 +- src/handlers/records-query.ts | 42 +- src/handlers/records-subscribe.ts | 215 ++++++ src/index.ts | 7 +- src/interfaces/records-query.ts | 4 +- src/interfaces/records-subscribe.ts | 104 +++ src/types/message-types.ts | 9 - src/types/method-handler.ts | 3 +- src/types/records-types.ts | 25 +- src/types/subscriptions.ts | 2 +- src/utils/records.ts | 30 +- tests/dwn.spec.ts | 4 +- tests/handlers/events-subscribe.spec.ts | 1 - tests/handlers/protocols-configure.spec.ts | 1 - tests/handlers/records-query.spec.ts | 4 +- tests/handlers/records-read.spec.ts | 1 - tests/handlers/records-subscribe.spec.ts | 704 ++++++++++++++++++ tests/handlers/records-write.spec.ts | 1 - tests/interfaces/records-subscribe.spec.ts | 79 ++ tests/scenarios/delegated-grant.spec.ts | 168 ++++- tests/scenarios/end-to-end-tests.spec.ts | 1 - tests/scenarios/subscriptions.spec.ts | 283 +++++++ tests/test-suite.ts | 2 + tests/utils/test-data-generator.ts | 58 +- .../protocol-definitions/friend-role.json | 4 + .../protocol-definitions/thread-role.json | 4 + 36 files changed, 1805 insertions(+), 104 deletions(-) create mode 100644 json-schemas/interface-methods/records-subscribe.json create mode 100644 src/handlers/records-subscribe.ts create mode 100644 src/interfaces/records-subscribe.ts create mode 100644 tests/handlers/records-subscribe.spec.ts create mode 100644 tests/interfaces/records-subscribe.spec.ts diff --git a/Q_AND_A.md b/Q_AND_A.md index 611a03f4d..21e592212 100644 --- a/Q_AND_A.md +++ b/Q_AND_A.md @@ -66,7 +66,7 @@ No. -- When making `RecordsQuery` by invoking a protocol role, why is `protocolPath` a required filter property? This means that one cannot filter records under a `protocol` or `contextId` irrespective of the `protocolPath`, thus is forced to make multiple queries (ie. one per `protocolPath`). +- When making `RecordsQuery` or `RecordsSubscribe` by invoking a protocol role, why is `protocolPath` a required filter property? This means that one cannot filter records under a `protocol` or `contextId` irrespective of the `protocolPath`, thus is forced to make multiple queries (ie. one per `protocolPath`). (Last update: 2023/11/03) @@ -79,3 +79,11 @@ - `write` - allows a DID to create and update the record they have created - `update` - allows a DID to update a record, regardless of the initial author + ## Subscriptions +- What happens to a subscription which is listening to events, but is no longer authorized due to revocation of a grant or role? + + (Last update: 2024/01/23) + + Currently if a subscription is no longer authorized but it is still active, the subscriber will still receive updates until they close the subscription themselves. If they try to re-subscribe after that, it will be rejected with a 401. + + This will be addressed in a future upgrade and we've created an issue to track it. https://github.com/TBD54566975/dwn-sdk-js/issues/668 - last updated (2024/01/22) \ No newline at end of file diff --git a/build/compile-validators.js b/build/compile-validators.js index 9d9bc0c43..038963192 100644 --- a/build/compile-validators.js +++ b/build/compile-validators.js @@ -46,6 +46,7 @@ import RecordsDelete from '../json-schemas/interface-methods/records-delete.json import RecordsFilter from '../json-schemas/interface-methods/records-filter.json' assert { type: 'json' }; import RecordsQuery from '../json-schemas/interface-methods/records-query.json' assert { type: 'json' }; import RecordsRead from '../json-schemas/interface-methods/records-read.json' assert { type: 'json' }; +import RecordsSubscribe from '../json-schemas/interface-methods/records-subscribe.json' assert { type: 'json' }; import RecordsWrite from '../json-schemas/interface-methods/records-write.json' assert { type: 'json' }; import RecordsWriteSignaturePayload from '../json-schemas/signature-payloads/records-write-signature-payload.json' assert { type: 'json' }; import RecordsWriteUnidentified from '../json-schemas/interface-methods/records-write-unidentified.json' assert { type: 'json' }; @@ -56,6 +57,7 @@ const schemas = { AuthorizationOwner, RecordsDelete, RecordsQuery, + RecordsSubscribe, RecordsWrite, RecordsWriteUnidentified, EventsFilter, diff --git a/json-schemas/interface-methods/protocol-rule-set.json b/json-schemas/interface-methods/protocol-rule-set.json index 72eec5034..db04fd141 100644 --- a/json-schemas/interface-methods/protocol-rule-set.json +++ b/json-schemas/interface-methods/protocol-rule-set.json @@ -66,6 +66,7 @@ "enum": [ "delete", "query", + "subscribe", "read", "update", "write" diff --git a/json-schemas/interface-methods/records-subscribe.json b/json-schemas/interface-methods/records-subscribe.json new file mode 100644 index 000000000..92c7939b0 --- /dev/null +++ b/json-schemas/interface-methods/records-subscribe.json @@ -0,0 +1,44 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://identity.foundation/dwn/json-schemas/records-subscribe.json", + "type": "object", + "additionalProperties": false, + "required": [ + "descriptor" + ], + "properties": { + "authorization": { + "$ref": "https://identity.foundation/dwn/json-schemas/authorization-delegated-grant.json" + }, + "descriptor": { + "type": "object", + "additionalProperties": false, + "required": [ + "interface", + "method", + "messageTimestamp", + "filter" + ], + "properties": { + "interface": { + "enum": [ + "Records" + ], + "type": "string" + }, + "method": { + "enum": [ + "Subscribe" + ], + "type": "string" + }, + "messageTimestamp": { + "$ref": "https://identity.foundation/dwn/json-schemas/defs.json#/definitions/date-time" + }, + "filter": { + "$ref": "https://identity.foundation/dwn/json-schemas/records-filter.json" + } + } + } + } +} \ No newline at end of file diff --git a/json-schemas/permissions/permissions-definitions.json b/json-schemas/permissions/permissions-definitions.json index 985167cff..4e19ef0ab 100644 --- a/json-schemas/permissions/permissions-definitions.json +++ b/json-schemas/permissions/permissions-definitions.json @@ -28,6 +28,9 @@ }, { "$ref": "https://identity.foundation/dwn/json-schemas/permissions/scopes.json#/definitions/records-query-scope" + }, + { + "$ref": "https://identity.foundation/dwn/json-schemas/permissions/scopes.json#/definitions/records-subscribe-scope" } ] }, diff --git a/json-schemas/permissions/scopes.json b/json-schemas/permissions/scopes.json index 5bd34e0ca..af83b84cd 100644 --- a/json-schemas/permissions/scopes.json +++ b/json-schemas/permissions/scopes.json @@ -106,6 +106,24 @@ "type": "string" } } + }, + "records-subscribe-scope": { + "type": "object", + "required": [ + "interface", + "method" + ], + "properties": { + "interface": { + "const": "Records" + }, + "method": { + "const": "Subscribe" + }, + "protocol": { + "type": "string" + } + } } } } \ No newline at end of file diff --git a/src/core/dwn-error.ts b/src/core/dwn-error.ts index 5fe1ae2cc..2a79098e1 100644 --- a/src/core/dwn-error.ts +++ b/src/core/dwn-error.ts @@ -90,7 +90,7 @@ export enum DwnErrorCode { RecordsGrantAuthorizationConditionPublicationProhibited = 'RecordsGrantAuthorizationConditionPublicationProhibited', RecordsGrantAuthorizationConditionPublicationRequired = 'RecordsGrantAuthorizationConditionPublicationRequired', RecordsGrantAuthorizationDeleteProtocolScopeMismatch = 'RecordsGrantAuthorizationDeleteProtocolScopeMismatch', - RecordsGrantAuthorizationQueryProtocolScopeMismatch = 'RecordsGrantAuthorizationQueryProtocolScopeMismatch', + RecordsGrantAuthorizationQueryOrSubscribeProtocolScopeMismatch = 'RecordsGrantAuthorizationQueryOrSubscribeProtocolScopeMismatch', RecordsGrantAuthorizationScopeContextIdMismatch = 'RecordsGrantAuthorizationScopeContextIdMismatch', RecordsGrantAuthorizationScopeNotProtocol = 'RecordsGrantAuthorizationScopeNotProtocol', RecordsGrantAuthorizationScopeProtocolMismatch = 'RecordsGrantAuthorizationScopeProtocolMismatch', @@ -103,6 +103,8 @@ export enum DwnErrorCode { RecordsQueryFilterMissingRequiredProperties = 'RecordsQueryFilterMissingRequiredProperties', RecordsReadReturnedMultiple = 'RecordsReadReturnedMultiple', RecordsReadAuthorizationFailed = 'RecordsReadAuthorizationFailed', + RecordsSubscribeEventStreamUnimplemented = 'RecordsSubscribeEventStreamUnimplemented', + RecordsSubscribeFilterMissingRequiredProperties = 'RecordsSubscribeFilterMissingRequiredProperties', RecordsSchemasDerivationSchemeMissingSchema = 'RecordsSchemasDerivationSchemeMissingSchema', RecordsValidateIntegrityDelegatedGrantAndIdExistenceMismatch = 'RecordsValidateIntegrityDelegatedGrantAndIdExistenceMismatch', RecordsValidateIntegrityGrantedToAndSignerMismatch = 'RecordsValidateIntegrityGrantedToAndSignerMismatch', diff --git a/src/core/protocol-authorization.ts b/src/core/protocol-authorization.ts index 5ad04fd92..d4be24c65 100644 --- a/src/core/protocol-authorization.ts +++ b/src/core/protocol-authorization.ts @@ -3,6 +3,7 @@ import type { MessageStore } from '../types/message-store.js'; import type { RecordsDelete } from '../interfaces/records-delete.js'; import type { RecordsQuery } from '../interfaces/records-query.js'; import type { RecordsRead } from '../interfaces/records-read.js'; +import type { RecordsSubscribe } from '../interfaces/records-subscribe.js'; import type { RecordsWriteMessage } from '../types/records-types.js'; import type { ProtocolActionRule, ProtocolDefinition, ProtocolRuleSet, ProtocolsConfigureMessage, ProtocolType, ProtocolTypes } from '../types/protocols-types.js'; @@ -152,22 +153,17 @@ export class ProtocolAuthorization { ); } - /** - * Performs protocol-based authorization against the incoming RecordsQuery message. - * @throws {Error} if authorization fails. - */ - public static async authorizeQuery( + public static async authorizeQueryOrSubscribe( tenant: string, - incomingMessage: RecordsQuery, + incomingMessage: RecordsQuery | RecordsSubscribe, messageStore: MessageStore, ): Promise { - // validate that required properties exist in query filter const { protocol, protocolPath, contextId } = incomingMessage.message.descriptor.filter; // fetch the protocol definition const protocolDefinition = await ProtocolAuthorization.fetchProtocolDefinition( tenant, - protocol!, // authorizeQuery` is only called if `protocol` is present + protocol!, // `authorizeQueryOrSubscribe` is only called if `protocol` is present messageStore, ); @@ -192,7 +188,7 @@ export class ProtocolAuthorization { tenant, incomingMessage, inboundMessageRuleSet, - [], // ancestor chain is not relevant to queries + [], // ancestor chain is not relevant to subscriptions messageStore, ); } @@ -423,7 +419,7 @@ export class ProtocolAuthorization { */ private static async verifyInvokedRole( tenant: string, - incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsWrite, + incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsSubscribe | RecordsWrite, protocolUri: string, contextId: string | undefined, protocolDefinition: ProtocolDefinition, @@ -481,7 +477,7 @@ export class ProtocolAuthorization { */ private static async getActionsSeekingARuleMatch( tenant: string, - incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsWrite, + incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsSubscribe | RecordsWrite, messageStore: MessageStore, ): Promise { @@ -495,6 +491,9 @@ export class ProtocolAuthorization { case DwnMethodName.Read: return [ProtocolAction.Read]; + case DwnMethodName.Subscribe: + return [ProtocolAction.Subscribe]; + case DwnMethodName.Write: const incomingRecordsWrite = incomingMessage as RecordsWrite; if (await incomingRecordsWrite.isInitialWrite()) { @@ -519,7 +518,7 @@ export class ProtocolAuthorization { */ private static async verifyAllowedActions( tenant: string, - incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsWrite, + incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsSubscribe | RecordsWrite, inboundMessageRuleSet: ProtocolRuleSet, ancestorMessageChain: RecordsWriteMessage[], messageStore: MessageStore, diff --git a/src/core/records-grant-authorization.ts b/src/core/records-grant-authorization.ts index 6b3259d55..74e69402a 100644 --- a/src/core/records-grant-authorization.ts +++ b/src/core/records-grant-authorization.ts @@ -1,7 +1,7 @@ import type { MessageStore } from '../types/message-store.js'; import type { RecordsPermissionScope } from '../types/permissions-grant-descriptor.js'; import type { PermissionsGrantMessage, RecordsPermissionsGrantMessage } from '../types/permissions-types.js'; -import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsReadMessage, RecordsWriteMessage } from '../types/records-types.js'; +import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsReadMessage, RecordsSubscribeMessage, RecordsWriteMessage } from '../types/records-types.js'; import { GrantAuthorization } from './grant-authorization.js'; import { PermissionsConditionPublication } from '../types/permissions-grant-descriptor.js'; @@ -63,35 +63,35 @@ export class RecordsGrantAuthorization { } /** - * Authorizes the scope of a PermissionsGrant for RecordsQuery. + * Authorizes the scope of a PermissionsGrant for RecordsQuery or RecordsSubscribe. * @param messageStore Used to check if the grant has been revoked. */ - public static async authorizeQuery(input: { - recordsQueryMessage: RecordsQueryMessage, + public static async authorizeQueryOrSubscribe(input: { + incomingMessage: RecordsQueryMessage | RecordsSubscribeMessage, expectedGrantedToInGrant: string, expectedGrantedForInGrant: string, permissionsGrantMessage: PermissionsGrantMessage, messageStore: MessageStore, }): Promise { const { - recordsQueryMessage, expectedGrantedToInGrant, expectedGrantedForInGrant, permissionsGrantMessage, messageStore + incomingMessage, expectedGrantedToInGrant, expectedGrantedForInGrant, permissionsGrantMessage, messageStore } = input; await GrantAuthorization.performBaseValidation({ - incomingMessage: recordsQueryMessage, + incomingMessage, expectedGrantedToInGrant, expectedGrantedForInGrant, permissionsGrantMessage, messageStore }); - // If the grant specifies a protocol, the query must specify the same protocol. - const protocolInGrant = (permissionsGrantMessage as RecordsPermissionsGrantMessage).descriptor.scope.protocol; - const protocolInQuery = recordsQueryMessage.descriptor.filter.protocol; - if (protocolInGrant !== undefined && protocolInQuery !== protocolInGrant) { + // If the grant specifies a protocol, the subscribe or query must specify the same protocol. + const protocolInGrant = (permissionsGrantMessage.descriptor.scope as RecordsPermissionScope).protocol; + const protocolInMessage = incomingMessage.descriptor.filter.protocol; + if (protocolInGrant !== undefined && protocolInMessage !== protocolInGrant) { throw new DwnError( - DwnErrorCode.RecordsGrantAuthorizationQueryProtocolScopeMismatch, - `Grant protocol scope ${protocolInGrant} does not match protocol in query ${protocolInQuery}` + DwnErrorCode.RecordsGrantAuthorizationQueryOrSubscribeProtocolScopeMismatch, + `Grant protocol scope ${protocolInGrant} does not match protocol in message ${protocolInMessage}` ); } } diff --git a/src/dwn.ts b/src/dwn.ts index efab692d7..79d1021fd 100644 --- a/src/dwn.ts +++ b/src/dwn.ts @@ -3,14 +3,15 @@ import type { EventLog } from './types/event-log.js'; import type { EventStream } from './types/subscriptions.js'; import type { MessageStore } from './types/message-store.js'; import type { MethodHandler } from './types/method-handler.js'; +import type { Readable } from 'readable-stream'; import type { TenantGate } from './core/tenant-gate.js'; import type { UnionMessageReply } from './core/message-reply.js'; import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeMessageOptions, EventsSubscribeReply } from './types/events-types.js'; -import type { GenericMessage, GenericMessageReply, MessageOptions } from './types/message-types.js'; +import type { GenericMessage, GenericMessageReply, MessageSubscriptionHandler } from './types/message-types.js'; import type { MessagesGetMessage, MessagesGetReply } from './types/messages-types.js'; import type { PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage } from './types/permissions-types.js'; import type { ProtocolsConfigureMessage, ProtocolsQueryMessage, ProtocolsQueryReply } from './types/protocols-types.js'; -import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsQueryReply, RecordsReadMessage, RecordsReadReply, RecordsWriteMessage, RecordsWriteMessageOptions } from './types/records-types.js'; +import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsQueryReply, RecordsReadMessage, RecordsReadReply, RecordsSubscribeMessage, RecordsSubscribeMessageOptions, RecordsSubscribeReply, RecordSubscriptionHandler, RecordsWriteMessage, RecordsWriteMessageOptions } from './types/records-types.js'; import { AllowAllTenantGate } from './core/tenant-gate.js'; import { DidResolver } from './did/did-resolver.js'; @@ -28,6 +29,7 @@ import { ProtocolsQueryHandler } from './handlers/protocols-query.js'; import { RecordsDeleteHandler } from './handlers/records-delete.js'; import { RecordsQueryHandler } from './handlers/records-query.js'; import { RecordsReadHandler } from './handlers/records-read.js'; +import { RecordsSubscribeHandler } from './handlers/records-subscribe.js'; import { RecordsWriteHandler } from './handlers/records-write.js'; import { DwnInterfaceName, DwnMethodName } from './enums/dwn-interface-method.js'; @@ -43,6 +45,7 @@ export class Dwn { private constructor(config: DwnConfig) { this.didResolver = config.didResolver!; this.tenantGate = config.tenantGate!; + this.eventStream = config.eventStream!; this.messageStore = config.messageStore; this.dataStore = config.dataStore; this.eventLog = config.eventLog; @@ -112,6 +115,11 @@ export class Dwn { this.messageStore, this.dataStore ), + [DwnInterfaceName.Records + DwnMethodName.Subscribe]: new RecordsSubscribeHandler( + this.didResolver, + this.messageStore, + this.eventStream + ), [DwnInterfaceName.Records + DwnMethodName.Write]: new RecordsWriteHandler( this.didResolver, this.messageStore, @@ -165,6 +173,8 @@ export class Dwn { public async processMessage(tenant: string, rawMessage: PermissionsRevokeMessage): Promise; public async processMessage(tenant: string, rawMessage: RecordsDeleteMessage): Promise; public async processMessage(tenant: string, rawMessage: RecordsQueryMessage): Promise; + public async processMessage( + tenant: string, rawMessage: RecordsSubscribeMessage, options: RecordsSubscribeMessageOptions): Promise; public async processMessage(tenant: string, rawMessage: RecordsReadMessage): Promise; public async processMessage(tenant: string, rawMessage: RecordsWriteMessage, options?: RecordsWriteMessageOptions): Promise; public async processMessage(tenant: string, rawMessage: unknown, options?: MessageOptions): Promise; @@ -233,6 +243,14 @@ export class Dwn { } }; +/** + * MessageOptions that are used when processing a message. + */ +export interface MessageOptions { + dataStream?: Readable; + subscriptionHandler?: MessageSubscriptionHandler | RecordSubscriptionHandler; +}; + /** * DWN configuration. */ diff --git a/src/handlers/events-subscribe.ts b/src/handlers/events-subscribe.ts index 8bad408c1..bba6c481e 100644 --- a/src/handlers/events-subscribe.ts +++ b/src/handlers/events-subscribe.ts @@ -65,4 +65,4 @@ export class EventsSubscribeHandler implements MethodHandler { subscription, }; } -} \ No newline at end of file +} diff --git a/src/handlers/records-query.ts b/src/handlers/records-query.ts index 1c4aea4b0..1771b76f8 100644 --- a/src/handlers/records-query.ts +++ b/src/handlers/records-query.ts @@ -35,7 +35,7 @@ export class RecordsQueryHandler implements MethodHandler { let recordsWrites: RecordsQueryReplyEntry[]; let cursor: PaginationCursor | undefined; // if this is an anonymous query and the filter supports published records, query only published records - if (RecordsQueryHandler.filterIncludesPublishedRecords(recordsQuery) && recordsQuery.author === undefined) { + if (Records.filterIncludesPublishedRecords(recordsQuery.message.descriptor.filter) && recordsQuery.author === undefined) { const results = await this.fetchPublishedRecords(tenant, recordsQuery); recordsWrites = results.messages as RecordsQueryReplyEntry[]; cursor = results.cursor; @@ -144,13 +144,13 @@ export class RecordsQueryHandler implements MethodHandler { private async fetchRecordsAsNonOwner( tenant: string, recordsQuery: RecordsQuery ): Promise<{ messages: GenericMessage[], cursor?: PaginationCursor }> { - const { dateSort, pagination } = recordsQuery.message.descriptor; + const { dateSort, pagination, filter } = recordsQuery.message.descriptor; const filters = []; - if (RecordsQueryHandler.filterIncludesPublishedRecords(recordsQuery)) { + if (Records.filterIncludesPublishedRecords(filter)) { filters.push(RecordsQueryHandler.buildPublishedRecordsFilter(recordsQuery)); } - if (RecordsQueryHandler.filterIncludesUnpublishedRecords(recordsQuery)) { + if (Records.filterIncludesUnpublishedRecords(filter)) { filters.push(RecordsQueryHandler.buildUnpublishedRecordsByQueryAuthorFilter(recordsQuery)); const recipientFilter = recordsQuery.message.descriptor.filter.recipient; @@ -158,7 +158,7 @@ export class RecordsQueryHandler implements MethodHandler { filters.push(RecordsQueryHandler.buildUnpublishedRecordsForQueryAuthorFilter(recordsQuery)); } - if (RecordsQueryHandler.shouldProtocolAuthorizeQuery(recordsQuery)) { + if (Records.shouldProtocolAuthorize(recordsQuery.signaturePayload!)) { filters.push(RecordsQueryHandler.buildUnpublishedProtocolAuthorizedRecordsFilter(recordsQuery)); } } @@ -238,34 +238,6 @@ export class RecordsQueryHandler implements MethodHandler { }; } - /** - * Determines if ProtocolAuthorization.authorizeQuery should be run and if the corresponding filter should be used. - */ - private static shouldProtocolAuthorizeQuery(recordsQuery: RecordsQuery): boolean { - return recordsQuery.signaturePayload!.protocolRole !== undefined; - } - - /** - * Checks if the recordQuery filter supports returning published records. - */ - private static filterIncludesPublishedRecords(recordsQuery: RecordsQuery): boolean { - const { filter } = recordsQuery.message.descriptor; - // When `published` and `datePublished` range are both undefined, published records can be returned. - return filter.datePublished !== undefined || filter.published !== false; - } - - /** - * Checks if the recordQuery filter supports returning unpublished records. - */ - private static filterIncludesUnpublishedRecords(recordsQuery: RecordsQuery): boolean { - const { filter } = recordsQuery.message.descriptor; - // When `published` and `datePublished` range are both undefined, unpublished records can be returned. - if (filter.datePublished === undefined && filter.published === undefined) { - return true; - } - return filter.published === false; - } - /** * @param messageStore Used to check if the grant has been revoked. */ @@ -280,8 +252,8 @@ export class RecordsQueryHandler implements MethodHandler { } // Only run protocol authz if message deliberately invokes it - if (RecordsQueryHandler.shouldProtocolAuthorizeQuery(recordsQuery)) { - await ProtocolAuthorization.authorizeQuery(tenant, recordsQuery, messageStore); + if (Records.shouldProtocolAuthorize(recordsQuery.signaturePayload!)) { + await ProtocolAuthorization.authorizeQueryOrSubscribe(tenant, recordsQuery, messageStore); } } } diff --git a/src/handlers/records-subscribe.ts b/src/handlers/records-subscribe.ts new file mode 100644 index 000000000..d1d33bda1 --- /dev/null +++ b/src/handlers/records-subscribe.ts @@ -0,0 +1,215 @@ +import type { DidResolver } from '../did/did-resolver.js'; +import type { Filter } from '../types/query-types.js'; +import type { MessageStore } from '../types//message-store.js'; +import type { MethodHandler } from '../types/method-handler.js'; +import type { EventListener, EventStream } from '../types/subscriptions.js'; +import type { RecordsDeleteMessage, RecordsSubscribeMessage, RecordsSubscribeReply, RecordSubscriptionHandler, RecordsWriteMessage } from '../types/records-types.js'; + +import { authenticate } from '../core/auth.js'; +import { FilterUtility } from '../utils/filter.js'; +import { Message } from '../core/message.js'; +import { messageReplyFromError } from '../core/message-reply.js'; +import { ProtocolAuthorization } from '../core/protocol-authorization.js'; +import { Records } from '../utils/records.js'; +import { RecordsSubscribe } from '../interfaces/records-subscribe.js'; +import { DwnError, DwnErrorCode } from '../core/dwn-error.js'; +import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js'; + +export class RecordsSubscribeHandler implements MethodHandler { + + constructor(private didResolver: DidResolver, private messageStore: MessageStore, private eventStream?: EventStream) { } + + public async handle({ + tenant, + message, + subscriptionHandler + }: { + tenant: string, + message: RecordsSubscribeMessage, + subscriptionHandler: RecordSubscriptionHandler, + }): Promise { + if (this.eventStream === undefined) { + return messageReplyFromError(new DwnError( + DwnErrorCode.RecordsSubscribeEventStreamUnimplemented, + 'Subscriptions are not supported' + ), 501); + } + + let recordsSubscribe: RecordsSubscribe; + try { + recordsSubscribe = await RecordsSubscribe.parse(message); + } catch (e) { + return messageReplyFromError(e, 400); + } + + let filters:Filter[] = []; + // if this is an anonymous subscribe and the filter supports published records, subscribe to only published records + if (Records.filterIncludesPublishedRecords(recordsSubscribe.message.descriptor.filter) && recordsSubscribe.author === undefined) { + // build filters for a stream of published records + filters = [ RecordsSubscribeHandler.buildPublishedRecordsFilter(recordsSubscribe) ]; + // delete the undefined authorization property else the code will encounter the following IPLD issue when attempting to generate CID: + // Error: `undefined` is not supported by the IPLD Data Model and cannot be encoded + delete message.authorization; + } else { + // authentication and authorization + try { + await authenticate(message.authorization!, this.didResolver); + await RecordsSubscribeHandler.authorizeRecordsSubscribe(tenant, recordsSubscribe, this.messageStore); + } catch (error) { + return messageReplyFromError(error, 401); + } + + if (recordsSubscribe.author === tenant) { + // if the subscribe author is the tenant, filter as owner. + filters = await RecordsSubscribeHandler.filterAsOwner(recordsSubscribe); + } else { + // otherwise build filters based on published records, permissions, or protocol rules + filters = await RecordsSubscribeHandler.filterAsNonOwner(recordsSubscribe); + } + } + + const listener: EventListener = (eventTenant, eventMessage, eventIndexes):void => { + if (tenant === eventTenant && FilterUtility.matchAnyFilter(eventIndexes, filters)) { + // the filters check for interface and method + // if matched the messages are either a `RecordsWriteMessage` or `RecordsDeleteMessage` + subscriptionHandler(eventMessage as RecordsWriteMessage | RecordsDeleteMessage); + } + }; + + const messageCid = await Message.getCid(message); + const subscription = await this.eventStream.subscribe(messageCid, listener); + return { + status: { code: 200, detail: 'OK' }, + subscription + }; + } + + /** + * Subscribe to records as the owner of the DWN with no additional filtering. + */ + private static async filterAsOwner(RecordsSubscribe: RecordsSubscribe): Promise { + const { filter } = RecordsSubscribe.message.descriptor; + + const subscribeFilter = { + ...Records.convertFilter(filter), + interface : DwnInterfaceName.Records, + method : [ DwnMethodName.Write, DwnMethodName.Delete ], // we filter for both write and delete so that subscriber can update state. + }; + + return [ subscribeFilter ]; + } + + /** + * Creates filters in order to subscribe to records as a non-owner. + * + * Filters can support emitting messages for both published and unpublished records, + * as well as explicitly only published or only unpublished records. + * + * A) BOTH published and unpublished: + * 1. published records; and + * 2. unpublished records intended for the subscription author (where `recipient` is the subscription author); and + * 3. unpublished records authorized by a protocol rule. + * + * B) PUBLISHED: + * 1. only published records; + * + * C) UNPUBLISHED: + * 1. unpublished records intended for the subscription author (where `recipient` is the subscription author); and + * 2. unpublished records authorized by a protocol rule. + */ + private static async filterAsNonOwner( + recordsSubscribe: RecordsSubscribe + ): Promise { + const filters:Filter[] = []; + const { filter } = recordsSubscribe.message.descriptor; + if (Records.filterIncludesPublishedRecords(filter)) { + filters.push(RecordsSubscribeHandler.buildPublishedRecordsFilter(recordsSubscribe)); + } + + if (Records.filterIncludesUnpublishedRecords(filter)) { + filters.push(RecordsSubscribeHandler.buildUnpublishedRecordsBySubscribeAuthorFilter(recordsSubscribe)); + + const recipientFilter = recordsSubscribe.message.descriptor.filter.recipient; + if (recipientFilter === undefined || recipientFilter === recordsSubscribe.author) { + filters.push(RecordsSubscribeHandler.buildUnpublishedRecordsForSubscribeAuthorFilter(recordsSubscribe)); + } + + if (Records.shouldProtocolAuthorize(recordsSubscribe.signaturePayload!)) { + filters.push(RecordsSubscribeHandler.buildUnpublishedProtocolAuthorizedRecordsFilter(recordsSubscribe)); + } + } + return filters; + } + + /** + * Creates a filter for all published records matching the subscribe + */ + private static buildPublishedRecordsFilter(recordsSubscribe: RecordsSubscribe): Filter { + return { + ...Records.convertFilter(recordsSubscribe.message.descriptor.filter), + interface : DwnInterfaceName.Records, + method : [ DwnMethodName.Write, DwnMethodName.Delete ], + published : true, + }; + } + + /** + * Creates a filter for unpublished records that are intended for the subscribe author (where `recipient` is the author). + */ + private static buildUnpublishedRecordsForSubscribeAuthorFilter(recordsSubscribe: RecordsSubscribe): Filter { + // include records where recipient is subscribe author + return { + ...Records.convertFilter(recordsSubscribe.message.descriptor.filter), + interface : DwnInterfaceName.Records, + method : [ DwnMethodName.Write, DwnMethodName.Delete ], + recipient : recordsSubscribe.author!, + published : false + }; + } + + /** + * Creates a filter for unpublished records that are within the specified protocol. + * Validation that `protocol` and other required protocol-related fields occurs before this method. + */ + private static buildUnpublishedProtocolAuthorizedRecordsFilter(recordsSubscribe: RecordsSubscribe): Filter { + return { + ...Records.convertFilter(recordsSubscribe.message.descriptor.filter), + interface : DwnInterfaceName.Records, + method : [ DwnMethodName.Write, DwnMethodName.Delete ], + published : false + }; + } + + /** + * Creates a filter for only unpublished records where the author is the same as the subscribe author. + */ + private static buildUnpublishedRecordsBySubscribeAuthorFilter(recordsSubscribe: RecordsSubscribe): Filter { + // include records where author is the same as the subscribe author + return { + ...Records.convertFilter(recordsSubscribe.message.descriptor.filter), + author : recordsSubscribe.author!, + interface : DwnInterfaceName.Records, + method : [ DwnMethodName.Write, DwnMethodName.Delete ], + published : false + }; + } + + /** + * @param messageStore Used to check if the grant has been revoked. + */ + public static async authorizeRecordsSubscribe( + tenant: string, + recordsSubscribe: RecordsSubscribe, + messageStore: MessageStore + ): Promise { + + if (Message.isSignedByDelegate(recordsSubscribe.message)) { + await recordsSubscribe.authorizeDelegate(messageStore); + } + + // Only run protocol authz if message deliberately invokes it + if (Records.shouldProtocolAuthorize(recordsSubscribe.signaturePayload!)) { + await ProtocolAuthorization.authorizeQueryOrSubscribe(tenant, recordsSubscribe, messageStore); + } + } +} diff --git a/src/index.ts b/src/index.ts index c5d5cf728..253ac411f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,13 +4,13 @@ export type { DidMethodResolver, DwnServiceEndpoint, ServiceEndpoint, DidDocumen export type { EventLog } from './types/event-log.js'; export type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeDescriptor, EventsSubscribeMessage, EventsSubscribeReply } from './types/events-types.js'; export type { EventStream, SubscriptionReply } from './types/subscriptions.js'; -export type { GenericMessage, GenericMessageReply, MessageSort, Pagination, QueryResultEntry } from './types/message-types.js'; +export type { GenericMessage, GenericMessageReply, MessageSort, MessageSubscription, MessageSubscriptionHandler, Pagination, QueryResultEntry } from './types/message-types.js'; export type { MessagesGetMessage, MessagesGetReply, MessagesGetReplyEntry } from './types/messages-types.js'; export type { Filter, EqualFilter, OneOfFilter, RangeFilter, RangeCriterion, PaginationCursor, QueryOptions } from './types/query-types.js'; export type { PermissionConditions, PermissionScope, PermissionsGrantDescriptor } from './types/permissions-grant-descriptor.js'; export type { PermissionsGrantMessage, PermissionsRequestDescriptor, PermissionsRequestMessage, PermissionsRevokeDescriptor, PermissionsRevokeMessage } from './types/permissions-types.js'; export type { ProtocolsConfigureDescriptor, ProtocolDefinition, ProtocolTypes, ProtocolRuleSet, ProtocolsQueryFilter, ProtocolsConfigureMessage, ProtocolsQueryMessage, ProtocolsQueryReply } from './types/protocols-types.js'; -export type { EncryptionProperty, RecordsDeleteMessage, RecordsQueryMessage, RecordsQueryReply, RecordsQueryReplyEntry, RecordsReadReply, RecordsWriteDescriptor, RecordsWriteMessage } from './types/records-types.js'; +export type { EncryptionProperty, RecordsDeleteMessage, RecordsQueryMessage, RecordsQueryReply, RecordsQueryReplyEntry, RecordsReadReply, RecordsSubscribeDescriptor, RecordsSubscribeMessage, RecordsWriteDescriptor, RecordsWriteMessage } from './types/records-types.js'; export { authenticate } from './core/auth.js'; export { ActiveTenantCheckResult, AllowAllTenantGate, TenantGate } from './core/tenant-gate.js'; export { Cid } from './utils/cid.js'; @@ -28,9 +28,9 @@ export { DwnError, DwnErrorCode } from './core/dwn-error.js'; export { DwnInterfaceName, DwnMethodName } from './enums/dwn-interface-method.js'; export { Encoder } from './utils/encoder.js'; export { EventsGet, EventsGetOptions } from './interfaces/events-get.js'; +export { EventsSubscribe, EventsSubscribeOptions } from './interfaces/events-subscribe.js'; export { Encryption, EncryptionAlgorithm } from './utils/encryption.js'; export { EncryptionInput, KeyEncryptionInput, RecordsWrite, RecordsWriteOptions, CreateFromOptions } from './interfaces/records-write.js'; -export { EventsSubscribe , EventsSubscribeOptions } from './interfaces/events-subscribe.js'; export { executeUnlessAborted } from './utils/abort.js'; export { Jws } from './utils/jws.js'; export { KeyMaterial, PrivateJwk, PublicJwk } from './types/jose-types.js'; @@ -48,6 +48,7 @@ export { ProtocolsQuery, ProtocolsQueryOptions } from './interfaces/protocols-qu export { Records } from './utils/records.js'; export { RecordsDelete, RecordsDeleteOptions } from './interfaces/records-delete.js'; export { RecordsRead, RecordsReadOptions } from './interfaces/records-read.js'; +export { RecordsSubscribe, RecordsSubscribeOptions } from './interfaces/records-subscribe.js'; export { Secp256k1 } from './utils/secp256k1.js'; export { Signer } from './types/signer.js'; export { SortDirection } from './types/query-types.js'; diff --git a/src/interfaces/records-query.ts b/src/interfaces/records-query.ts index c935ddcbe..3f9a84be1 100644 --- a/src/interfaces/records-query.ts +++ b/src/interfaces/records-query.ts @@ -120,8 +120,8 @@ export class RecordsQuery extends AbstractMessage { */ public async authorizeDelegate(messageStore: MessageStore): Promise { const delegatedGrant = this.message.authorization!.authorDelegatedGrant!; - await RecordsGrantAuthorization.authorizeQuery({ - recordsQueryMessage : this.message, + await RecordsGrantAuthorization.authorizeQueryOrSubscribe({ + incomingMessage : this.message, expectedGrantedToInGrant : this.signer!, expectedGrantedForInGrant : this.author!, permissionsGrantMessage : delegatedGrant, diff --git a/src/interfaces/records-subscribe.ts b/src/interfaces/records-subscribe.ts new file mode 100644 index 000000000..0ec77ffe2 --- /dev/null +++ b/src/interfaces/records-subscribe.ts @@ -0,0 +1,104 @@ +import type { DelegatedGrantMessage } from '../types/delegated-grant-message.js'; +import type { MessageStore } from '../types/message-store.js'; +import type { Signer } from '../types/signer.js'; +import type { RecordsFilter, RecordsSubscribeDescriptor, RecordsSubscribeMessage } from '../types/records-types.js'; + +import { AbstractMessage } from '../core/abstract-message.js'; +import { Message } from '../core/message.js'; +import { Records } from '../utils/records.js'; +import { RecordsGrantAuthorization } from '../core/records-grant-authorization.js'; +import { removeUndefinedProperties } from '../utils/object.js'; +import { Time } from '../utils/time.js'; +import { DwnError, DwnErrorCode } from '../core/dwn-error.js'; +import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js'; +import { validateProtocolUrlNormalized, validateSchemaUrlNormalized } from '../utils/url.js'; + +export type RecordsSubscribeOptions = { + messageTimestamp?: string; + filter: RecordsFilter; + signer?: Signer; + protocolRole?: string; + + /** + * The delegated grant to sign on behalf of the logical author, which is the grantor (`grantedBy`) of the delegated grant. + */ + delegatedGrant?: DelegatedGrantMessage; +}; + +/** + * A class representing a RecordsSubscribe DWN message. + */ +export class RecordsSubscribe extends AbstractMessage { + + public static async parse(message: RecordsSubscribeMessage): Promise { + let signaturePayload; + if (message.authorization !== undefined) { + signaturePayload = await Message.validateSignatureStructure(message.authorization.signature, message.descriptor); + } + + Records.validateDelegatedGrantReferentialIntegrity(message, signaturePayload); + + if (signaturePayload?.protocolRole !== undefined) { + if (message.descriptor.filter.protocolPath === undefined) { + throw new DwnError( + DwnErrorCode.RecordsSubscribeFilterMissingRequiredProperties, + 'Role-authorized subscriptions must include `protocolPath` in the filter' + ); + } + } + if (message.descriptor.filter.protocol !== undefined) { + validateProtocolUrlNormalized(message.descriptor.filter.protocol); + } + if (message.descriptor.filter.schema !== undefined) { + validateSchemaUrlNormalized(message.descriptor.filter.schema); + } + Time.validateTimestamp(message.descriptor.messageTimestamp); + + return new RecordsSubscribe(message); + } + + public static async create(options: RecordsSubscribeOptions): Promise { + const descriptor: RecordsSubscribeDescriptor = { + interface : DwnInterfaceName.Records, + method : DwnMethodName.Subscribe, + messageTimestamp : options.messageTimestamp ?? Time.getCurrentTimestamp(), + filter : Records.normalizeFilter(options.filter), + }; + + // delete all descriptor properties that are `undefined` else the code will encounter the following IPLD issue when attempting to generate CID: + // Error: `undefined` is not supported by the IPLD Data Model and cannot be encoded + removeUndefinedProperties(descriptor); + + // only generate the `authorization` property if signature input is given + const signer = options.signer; + let authorization; + if (signer) { + authorization = await Message.createAuthorization({ + descriptor, + signer, + protocolRole : options.protocolRole, + delegatedGrant : options.delegatedGrant + }); + } + const message = { descriptor, authorization }; + + Message.validateJsonSchema(message); + + return new RecordsSubscribe(message); + } + + /** + * Authorizes the delegate who signed the message. + * @param messageStore Used to check if the grant has been revoked. + */ + public async authorizeDelegate(messageStore: MessageStore): Promise { + const delegatedGrant = this.message.authorization!.authorDelegatedGrant!; + await RecordsGrantAuthorization.authorizeQueryOrSubscribe({ + incomingMessage : this.message, + expectedGrantedToInGrant : this.signer!, + expectedGrantedForInGrant : this.author!, + permissionsGrantMessage : delegatedGrant, + messageStore + }); + } +} diff --git a/src/types/message-types.ts b/src/types/message-types.ts index 122083b40..c3dfe747d 100644 --- a/src/types/message-types.ts +++ b/src/types/message-types.ts @@ -1,6 +1,5 @@ import type { DelegatedGrantMessage } from '../types/delegated-grant-message.js'; import type { GeneralJws } from './jws-types.js'; -import type { Readable } from 'readable-stream'; import type { PaginationCursor, SortDirection } from './query-types.js'; /** @@ -11,14 +10,6 @@ export type GenericMessage = { authorization?: AuthorizationModel; }; -/** - * MessageOptions that are used when processing a message. - */ -export type MessageOptions = { - dataStream?: Readable; - subscriptionHandler?: MessageSubscriptionHandler; -}; - /** * The data model for the `authorization` property in a DWN message. */ diff --git a/src/types/method-handler.ts b/src/types/method-handler.ts index 7a3accdfc..1959acbfa 100644 --- a/src/types/method-handler.ts +++ b/src/types/method-handler.ts @@ -1,4 +1,5 @@ import type { Readable } from 'readable-stream'; +import type { RecordSubscriptionHandler } from './records-types.js'; import type { GenericMessage, GenericMessageReply, MessageSubscriptionHandler } from './message-types.js'; /** @@ -12,6 +13,6 @@ export interface MethodHandler { tenant: string; message: GenericMessage; dataStream?: Readable - subscriptionHandler?: MessageSubscriptionHandler; + subscriptionHandler?: MessageSubscriptionHandler | RecordSubscriptionHandler; }): Promise; } \ No newline at end of file diff --git a/src/types/records-types.ts b/src/types/records-types.ts index 18ea8211b..cc146486a 100644 --- a/src/types/records-types.ts +++ b/src/types/records-types.ts @@ -3,7 +3,7 @@ import type { GeneralJws } from './jws-types.js'; import type { KeyDerivationScheme } from '../utils/hd-key.js'; import type { PublicJwk } from './jose-types.js'; import type { Readable } from 'readable-stream'; -import type { AuthorizationModel, GenericMessage, GenericMessageReply, GenericSignaturePayload, Pagination } from './message-types.js'; +import type { AuthorizationModel, GenericMessage, GenericMessageReply, GenericSignaturePayload, MessageSubscription, Pagination } from './message-types.js'; import type { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js'; import type { PaginationCursor, RangeCriterion, RangeFilter } from './query-types.js'; @@ -106,6 +106,13 @@ export type RecordsQueryDescriptor = { pagination?: Pagination; }; +export type RecordsSubscribeDescriptor = { + interface: DwnInterfaceName.Records; + method: DwnMethodName.Subscribe; + messageTimestamp: string; + filter: RecordsFilter; +}; + export type RecordsFilter = { /**the logical author of the record */ author?: string; @@ -146,6 +153,20 @@ export type RecordsQueryReply = GenericMessageReply & { cursor?: PaginationCursor; }; +export type RecordSubscriptionHandler = (message: RecordsWriteMessage | RecordsDeleteMessage) => void; + +export type RecordsSubscribeMessageOptions = { + subscriptionHandler: RecordSubscriptionHandler; +}; + +export type RecordsSubscribeMessage = GenericMessage & { + descriptor: RecordsSubscribeDescriptor; +}; + +export type RecordsSubscribeReply = GenericMessageReply & { + subscription?: MessageSubscription; +}; + export type RecordsReadMessage = { authorization?: AuthorizationModel; descriptor: RecordsReadDescriptor; @@ -178,4 +199,4 @@ export type RecordsDeleteDescriptor = { method: DwnMethodName.Delete; recordId: string; messageTimestamp: string; -}; +}; \ No newline at end of file diff --git a/src/types/subscriptions.ts b/src/types/subscriptions.ts index 72ea93a39..29037b0d6 100644 --- a/src/types/subscriptions.ts +++ b/src/types/subscriptions.ts @@ -21,4 +21,4 @@ export interface EventSubscription { export type SubscriptionReply = GenericMessageReply & { subscription?: MessageSubscription; -}; \ No newline at end of file +}; diff --git a/src/utils/records.ts b/src/utils/records.ts index f998f0d55..8dd05fc99 100644 --- a/src/utils/records.ts +++ b/src/utils/records.ts @@ -2,7 +2,7 @@ import type { DerivedPrivateJwk } from './hd-key.js'; import type { Filter } from '../types/query-types.js'; import type { GenericSignaturePayload } from '../types/message-types.js'; import type { Readable } from 'readable-stream'; -import type { RecordsDeleteMessage, RecordsFilter, RecordsQueryMessage, RecordsReadMessage, RecordsWriteDescriptor, RecordsWriteMessage } from '../types/records-types.js'; +import type { RecordsDeleteMessage, RecordsFilter, RecordsQueryMessage, RecordsReadMessage, RecordsSubscribeMessage, RecordsWriteDescriptor, RecordsWriteMessage } from '../types/records-types.js'; import { DateSort } from '../types/records-types.js'; import { Encoder } from './encoder.js'; @@ -289,7 +289,7 @@ export class Records { * Usage of this property is purely for performance optimization so we don't have to decode the signature payload again. */ public static validateDelegatedGrantReferentialIntegrity( - message: RecordsReadMessage | RecordsQueryMessage | RecordsWriteMessage | RecordsDeleteMessage, + message: RecordsReadMessage | RecordsQueryMessage | RecordsWriteMessage | RecordsDeleteMessage | RecordsSubscribeMessage, signaturePayload: GenericSignaturePayload | undefined ): void { // `deletedGrantId` in the payload of the message signature and `authorDelegatedGrant` in `authorization` must both exist or be both undefined @@ -316,4 +316,30 @@ export class Records { } } } + + /** + * Determines if signature payload contains a protocolRole and should be authorized as such. + */ + static shouldProtocolAuthorize(signaturePayload: GenericSignaturePayload): boolean { + return signaturePayload.protocolRole !== undefined; + } + + /** + * Checks if the filter supports returning published records. + */ + static filterIncludesPublishedRecords(filter: RecordsFilter): boolean { + // When `published` and `datePublished` range are both undefined, published records can be returned. + return filter.datePublished !== undefined || filter.published !== false; + } + + /** + * Checks if the filter supports returning unpublished records. + */ + static filterIncludesUnpublishedRecords(filter: RecordsFilter): boolean { + // When `published` and `datePublished` range are both undefined, unpublished records can be returned. + if (filter.datePublished === undefined && filter.published === undefined) { + return true; + } + return filter.published === false; + } } diff --git a/tests/dwn.spec.ts b/tests/dwn.spec.ts index 3ad69d3bc..fef8e8e35 100644 --- a/tests/dwn.spec.ts +++ b/tests/dwn.spec.ts @@ -162,12 +162,14 @@ export function testDwnClass(): void { const messageStoreStub = stubInterface(); const dataStoreStub = stubInterface(); const eventLogStub = stubInterface(); + const eventStreamStub = stubInterface(); const dwnWithConfig = await Dwn.create({ tenantGate : blockAllTenantGate, messageStore : messageStoreStub, dataStore : dataStoreStub, - eventLog : eventLogStub + eventLog : eventLogStub, + eventStream : eventStreamStub }); const alice = await DidKeyResolver.generate(); diff --git a/tests/handlers/events-subscribe.spec.ts b/tests/handlers/events-subscribe.spec.ts index 9f52fc1bd..96e6c4ae8 100644 --- a/tests/handlers/events-subscribe.spec.ts +++ b/tests/handlers/events-subscribe.spec.ts @@ -73,7 +73,6 @@ export function testEventsSubscribeHandler(): void { expect(subscriptionMessageReply.status.code).to.equal(501, subscriptionMessageReply.status.detail); expect(subscriptionMessageReply.status.detail).to.include(DwnErrorCode.EventsSubscribeEventStreamUnimplemented); }); - }); describe('EventStream enabled', () => { diff --git a/tests/handlers/protocols-configure.spec.ts b/tests/handlers/protocols-configure.spec.ts index 7d56d2dc8..2dd03c51a 100644 --- a/tests/handlers/protocols-configure.spec.ts +++ b/tests/handlers/protocols-configure.spec.ts @@ -48,7 +48,6 @@ export function testProtocolsConfigureHandler(): void { messageStore = stores.messageStore; dataStore = stores.dataStore; eventLog = stores.eventLog; - eventStream = TestEventStream.get(); dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog, eventStream }); diff --git a/tests/handlers/records-query.spec.ts b/tests/handlers/records-query.spec.ts index 0be0ead9d..ae660895d 100644 --- a/tests/handlers/records-query.spec.ts +++ b/tests/handlers/records-query.spec.ts @@ -50,7 +50,6 @@ export function testRecordsQueryHandler(): void { messageStore = stores.messageStore; dataStore = stores.dataStore; eventLog = stores.eventLog; - eventStream = TestEventStream.get(); dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog, eventStream }); @@ -2030,8 +2029,7 @@ export function testRecordsQueryHandler(): void { }); it('allows $contextRole authorized queries', async () => { - // scenario: Alice writes some chat messages writes a chat message. Bob invokes his - // friend role in order to query the chat message. + // scenario: Alice writes some chat messages. Bob invokes his friend role in order to query the chat messages. const alice = await DidKeyResolver.generate(); const bob = await DidKeyResolver.generate(); diff --git a/tests/handlers/records-read.spec.ts b/tests/handlers/records-read.spec.ts index 5ecb04137..e0807e56c 100644 --- a/tests/handlers/records-read.spec.ts +++ b/tests/handlers/records-read.spec.ts @@ -57,7 +57,6 @@ export function testRecordsReadHandler(): void { messageStore = stores.messageStore; dataStore = stores.dataStore; eventLog = stores.eventLog; - eventStream = TestEventStream.get(); dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog, eventStream }); diff --git a/tests/handlers/records-subscribe.spec.ts b/tests/handlers/records-subscribe.spec.ts new file mode 100644 index 000000000..a820a65d1 --- /dev/null +++ b/tests/handlers/records-subscribe.spec.ts @@ -0,0 +1,704 @@ +import type { EventStream } from '../../src/types/subscriptions.js'; +import type { GenericMessage } from '../../src/types/message-types.js'; +import type { DataStore, EventLog, MessageStore, RecordsWriteMessage } from '../../src/index.js'; +import type { RecordsDeleteMessage, RecordsFilter, RecordSubscriptionHandler } from '../../src/types/records-types.js'; + +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import chai, { expect } from 'chai'; + +import friendRoleProtocolDefinition from '../vectors/protocol-definitions/friend-role.json' assert { type: 'json' }; +import threadRoleProtocolDefinition from '../vectors/protocol-definitions/thread-role.json' assert { type: 'json' }; + +import { DidKeyResolver } from '../../src/did/did-key-resolver.js'; +import { Jws } from '../../src/utils/jws.js'; +import { Message } from '../../src/core/message.js'; +import { RecordsSubscribe } from '../../src/interfaces/records-subscribe.js'; +import { RecordsSubscribeHandler } from '../../src/handlers/records-subscribe.js'; +import { stubInterface } from 'ts-sinon'; +import { TestDataGenerator } from '../utils/test-data-generator.js'; +import { TestEventStream } from '../test-event-stream.js'; +import { TestStores } from '../test-stores.js'; +import { TestStubGenerator } from '../utils/test-stub-generator.js'; +import { DidResolver, Dwn, Time } from '../../src/index.js'; +import { DwnErrorCode, DwnInterfaceName, DwnMethodName } from '../../src/index.js'; + +chai.use(chaiAsPromised); + +export function testRecordsSubscribeHandler(): void { + describe('RecordsSubscribeHandler.handle()', () => { + describe('EventStream disabled',() => { + let didResolver: DidResolver; + let messageStore: MessageStore; + let dataStore: DataStore; + let eventLog: EventLog; + let dwn: Dwn; + + // important to follow the `before` and `after` pattern to initialize and clean the stores in tests + // so that different test suites can reuse the same backend store for testing + before(async () => { + didResolver = new DidResolver([new DidKeyResolver()]); + + const stores = TestStores.get(); + messageStore = stores.messageStore; + dataStore = stores.dataStore; + eventLog = stores.eventLog; + + dwn = await Dwn.create({ + didResolver, + messageStore, + dataStore, + eventLog, + }); + + }); + + + beforeEach(async () => { + sinon.restore(); // wipe all previous stubs/spies/mocks/fakes + + // clean up before each test rather than after so that a test does not depend on other tests to do the clean up + await messageStore.clear(); + await dataStore.clear(); + await eventLog.clear(); + }); + + after(async () => { + await dwn.close(); + }); + + it('should respond with a 501 if subscriptions are not supported', async () => { + await dwn.close(); // close the original dwn instance + dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog }); // leave out eventStream + + const alice = await DidKeyResolver.generate(); + // attempt to subscribe + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + }); + const subscriptionMessageReply = await dwn.processMessage(alice.did, message, { subscriptionHandler: (_) => {} }); + expect(subscriptionMessageReply.status.code).to.equal(501, subscriptionMessageReply.status.detail); + expect(subscriptionMessageReply.status.detail).to.include(DwnErrorCode.RecordsSubscribeEventStreamUnimplemented); + }); + }); + + describe('functional tests', () => { + let didResolver: DidResolver; + let messageStore: MessageStore; + let dataStore: DataStore; + let eventLog: EventLog; + let eventStream: EventStream; + let dwn: Dwn; + + // important to follow the `before` and `after` pattern to initialize and clean the stores in tests + // so that different test suites can reuse the same backend store for testing + before(async () => { + didResolver = new DidResolver([new DidKeyResolver()]); + + const stores = TestStores.get(); + messageStore = stores.messageStore; + dataStore = stores.dataStore; + eventLog = stores.eventLog; + eventStream = TestEventStream.get(); + + dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog, eventStream }); + }); + + beforeEach(async () => { + sinon.restore(); // wipe all previous stubs/spies/mocks/fakes + + // clean up before each test rather than after so that a test does not depend on other tests to do the clean up + await messageStore.clear(); + await dataStore.clear(); + await eventLog.clear(); + }); + + after(async () => { + await dwn.close(); + }); + + it('should return a subscription object', async () => { + const alice = await DidKeyResolver.generate(); + + const recordsSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : alice, + filter : { schema: 'some-schema' }, + }); + + // Send records subscribe message + const reply = await dwn.processMessage(alice.did, recordsSubscribe.message, { subscriptionHandler: () => {} }); + expect(reply.status.code).to.equal(200); + expect(reply.subscription).to.exist; + }); + + it('should return 400 if protocol is not normalized', async () => { + const alice = await DidKeyResolver.generate(); + + // subscribe for non-normalized protocol + const recordsSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : alice, + filter : { protocol: 'example.com/' }, + }); + + // overwrite protocol because #create auto-normalizes protocol + recordsSubscribe.message.descriptor.filter.protocol = 'example.com/'; + + // Re-create auth because we altered the descriptor after signing + recordsSubscribe.message.authorization = await Message.createAuthorization({ + descriptor : recordsSubscribe.message.descriptor, + signer : Jws.createSigner(alice) + }); + + // Send records subscribe message + const reply = await dwn.processMessage(alice.did, recordsSubscribe.message); + expect(reply.status.code).to.equal(400); + expect(reply.status.detail).to.contain(DwnErrorCode.UrlProtocolNotNormalized); + }); + + it('should return 400 if schema is not normalized', async () => { + const alice = await DidKeyResolver.generate(); + + // subscribe for non-normalized schema + const recordsSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : alice, + filter : { schema: 'example.com/' }, + }); + + // overwrite schema because #create auto-normalizes schema + recordsSubscribe.message.descriptor.filter.schema = 'example.com/'; + + // Re-create auth because we altered the descriptor after signing + recordsSubscribe.message.authorization = await Message.createAuthorization({ + descriptor : recordsSubscribe.message.descriptor, + signer : Jws.createSigner(alice) + }); + + // Send records subscribe message + const reply = await dwn.processMessage(alice.did, recordsSubscribe.message); + expect(reply.status.code).to.equal(400); + expect(reply.status.detail).to.contain(DwnErrorCode.UrlSchemaNotNormalized); + }); + + it('should return 400 if published is set to false and a datePublished range is provided', async () => { + const fromDatePublished = Time.getCurrentTimestamp(); + const alice = await DidKeyResolver.generate(); + // set to true so create does not fail + const recordSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : alice, + filter : { datePublished: { from: fromDatePublished }, published: true } + }); + + // set to false + recordSubscribe.message.descriptor.filter.published = false; + const subscribeResponse = await dwn.processMessage(alice.did, recordSubscribe.message); + expect(subscribeResponse.status.code).to.equal(400); + expect(subscribeResponse.status.detail).to.contain('descriptor/filter/published: must be equal to one of the allowed values'); + }); + + it('should return 401 for anonymous subscriptions that filter explicitly for unpublished records', async () => { + const alice = await DidKeyResolver.generate(); + + // create an unpublished record + const draftWrite = await TestDataGenerator.generateRecordsWrite({ author: alice, schema: 'post' }); + const draftWriteReply = await dwn.processMessage(alice.did, draftWrite.message, { dataStream: draftWrite.dataStream }); + expect(draftWriteReply.status.code).to.equal(202); + + // validate that alice can subscribe + const unpublishedPostSubscribe = await TestDataGenerator.generateRecordsSubscribe({ author: alice, filter: { schema: 'post', published: false } }); + const unpublishedPostReply = await dwn.processMessage(alice.did, unpublishedPostSubscribe.message, { subscriptionHandler: () => {} }); + expect(unpublishedPostReply.status.code).to.equal(200); + expect(unpublishedPostReply.subscription).to.exist; + + // anonymous subscribe for unpublished records + const unpublishedAnonymous = await RecordsSubscribe.create({ filter: { schema: 'post', published: false } }); + const anonymousPostReply = await dwn.processMessage(alice.did, unpublishedAnonymous.message); + expect(anonymousPostReply.status.code).to.equal(401); + expect(anonymousPostReply.status.detail).contains('Missing JWS'); + expect(anonymousPostReply.subscription).to.not.exist; + }); + + it('should return 401 if signature check fails', async () => { + const { author, message } = await TestDataGenerator.generateRecordsSubscribe(); + const tenant = author!.did; + + // setting up a stub did resolver & message store + // intentionally not supplying the public key so a different public key is generated to simulate invalid signature + const mismatchingPersona = await TestDataGenerator.generatePersona({ did: author!.did, keyId: author!.keyId }); + const didResolver = TestStubGenerator.createDidResolverStub(mismatchingPersona); + const messageStore = stubInterface(); + const eventStream = stubInterface(); + + const recordsSubscribeHandler = new RecordsSubscribeHandler(didResolver, messageStore, eventStream); + const reply = await recordsSubscribeHandler.handle({ tenant, message, subscriptionHandler: () => {} }); + + expect(reply.status.code).to.equal(401); + }); + + it('should return 400 if fail parsing the message', async () => { + const { author, message } = await TestDataGenerator.generateRecordsSubscribe(); + const tenant = author!.did; + + // setting up a stub method resolver & message store + const didResolver = TestStubGenerator.createDidResolverStub(author!); + const messageStore = stubInterface(); + const eventStream = stubInterface(); + const recordsSubscribeHandler = new RecordsSubscribeHandler(didResolver, messageStore, eventStream); + + // stub the `parse()` function to throw an error + sinon.stub(RecordsSubscribe, 'parse').throws('anyError'); + const reply = await recordsSubscribeHandler.handle({ tenant, message, subscriptionHandler: () => {} }); + + expect(reply.status.code).to.equal(400); + }); + + describe('protocol based subscriptions', () => { + it('does not try protocol authorization if protocolRole is not invoked', async () => { + // scenario: Alice creates a thread and writes some chat messages. Alice addresses + // only one chat message to Bob. Bob subscribes by protocol URI without invoking a protocolRole, + // and he is able to receive the message addressed to him. + + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + const protocolDefinition = threadRoleProtocolDefinition; + + const protocolsConfig = await TestDataGenerator.generateProtocolsConfigure({ + author: alice, + protocolDefinition + }); + const protocolsConfigureReply = await dwn.processMessage(alice.did, protocolsConfig.message); + expect(protocolsConfigureReply.status.code).to.equal(202); + + const messageCids: string[] = []; + const addCid = async (message: RecordsWriteMessage | RecordsDeleteMessage): Promise => { + const messageCid = await Message.getCid(message); + messageCids.push(messageCid); + }; + + const bobSubscription = await TestDataGenerator.generateRecordsSubscribe({ + author : bob, + filter : { + published : false, + protocol : protocolDefinition.protocol, + } + }); + const subscriptionReply = await dwn.processMessage(alice.did, bobSubscription.message, { subscriptionHandler: addCid }); + expect(subscriptionReply.status.code).to.equal(200); + expect(subscriptionReply.subscription).to.exist; + + // Alice writes a 'thread' record + const threadRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + protocol : protocolDefinition.protocol, + protocolPath : 'thread', + }); + const threadRoleReply = await dwn.processMessage(alice.did, threadRecord.message, { dataStream: threadRecord.dataStream }); + expect(threadRoleReply.status.code).to.equal(202); + + // Alice writes one 'chat' record addressed to Bob + const chatRecordForBob = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : bob.did, + protocol : protocolDefinition.protocol, + protocolPath : 'thread/chat', + published : false, + contextId : threadRecord.message.contextId, + parentId : threadRecord.message.recordId, + data : new TextEncoder().encode('Bob can read this cuz he is my friend'), + }); + const chatRecordForBobReply = await dwn.processMessage(alice.did, chatRecordForBob.message, { dataStream: chatRecordForBob.dataStream }); + expect(chatRecordForBobReply.status.code).to.equal(202); + + // Alice writes two 'chat' records NOT addressed to Bob + for (let i = 0; i < 2; i++) { + const chatRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : alice.did, + protocol : protocolDefinition.protocol, + protocolPath : 'thread/chat', + published : false, + contextId : threadRecord.message.contextId, + parentId : threadRecord.message.recordId, + data : new TextEncoder().encode('Bob cannot read this'), + }); + const chatReply = await dwn.processMessage(alice.did, chatRecord.message, { dataStream: chatRecord.dataStream }); + expect(chatReply.status.code).to.equal(202); + } + + expect(messageCids.length).to.equal(1, 'before delete'); + expect(messageCids[0]).to.equal(await Message.getCid(chatRecordForBob.message)); + }); + + it('allows $globalRole authorized subscriptions', async () => { + // scenario: Alice creates a thread and writes some chat messages writes a chat message. Bob invokes his + // thread member role in order to subscribe to the chat messages. + + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + const protocolDefinition = friendRoleProtocolDefinition; + + const protocolsConfig = await TestDataGenerator.generateProtocolsConfigure({ + author: alice, + protocolDefinition + }); + const protocolsConfigureReply = await dwn.processMessage(alice.did, protocolsConfig.message); + expect(protocolsConfigureReply.status.code).to.equal(202); + + const filter: RecordsFilter = { + published : false, + protocol : protocolDefinition.protocol, + protocolPath : 'chat' + }; + + const noRoleRecords: string[] = []; + const addNoRole = async (message: GenericMessage): Promise => { + if (message.descriptor.interface === DwnInterfaceName.Records && message.descriptor.method === DwnMethodName.Write) { + const recordsWriteMessage = message as RecordsWriteMessage; + noRoleRecords.push(recordsWriteMessage.recordId); + } + }; + + // subscribe without role, expect no messages + const noRoleSubscription = await TestDataGenerator.generateRecordsSubscribe({ + author: bob, + filter + }); + + const subscriptionReply = await dwn.processMessage(alice.did, noRoleSubscription.message, { subscriptionHandler: addNoRole }); + expect(subscriptionReply.status.code).to.equal(200); + expect(subscriptionReply.subscription).to.exist; + + + // Alice writes a 'friend' $globalRole record with Bob as recipient + const friendRoleRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : bob.did, + protocol : protocolDefinition.protocol, + protocolPath : 'friend', + data : new TextEncoder().encode('Bob is my friend'), + }); + const friendRoleReply = await dwn.processMessage(alice.did, friendRoleRecord.message, { dataStream: friendRoleRecord.dataStream }); + expect(friendRoleReply.status.code).to.equal(202); + + const recordIds: string[] = []; + const addRecord:RecordSubscriptionHandler = async (message) => { + if (message.descriptor.method === DwnMethodName.Write) { + const recordsWriteMessage = message as RecordsWriteMessage; + recordIds.push(recordsWriteMessage.recordId); + } + }; + + // subscribe with friend role + const bobSubscriptionWithRole = await TestDataGenerator.generateRecordsSubscribe({ + filter, + author : bob, + protocolRole : 'friend', + }); + + const subscriptionWithRoleReply = await dwn.processMessage(alice.did, bobSubscriptionWithRole.message, { subscriptionHandler: addRecord }); + expect(subscriptionWithRoleReply.status.code).to.equal(200); + expect(subscriptionWithRoleReply.subscription).to.exist; + + + // Alice writes three 'chat' records + const chatRecordIds = []; + for (let i = 0; i < 3; i++) { + const chatRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : alice.did, + protocol : protocolDefinition.protocol, + protocolPath : 'chat', + published : false, + data : new TextEncoder().encode('Bob can read this cuz he is my friend'), + }); + const chatReply = await dwn.processMessage(alice.did, chatRecord.message, { dataStream: chatRecord.dataStream }); + expect(chatReply.status.code).to.equal(202); + chatRecordIds.push(chatRecord.message.recordId); + } + + // there should not be any messages in the subscription without a friend role. + expect(noRoleRecords.length).to.equal(0); + + // should have all chat messages + expect(recordIds).to.have.members(chatRecordIds); + }); + + it('allows protocol authorized subscriptions', async () => { + // scenario: Alice writes some chat messages. + // Bob, having a thread/participant record, can subscribe to the chat. + + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + const protocolDefinition = threadRoleProtocolDefinition; + + const protocolsConfig = await TestDataGenerator.generateProtocolsConfigure({ + author: alice, + protocolDefinition + }); + const protocolsConfigureReply = await dwn.processMessage(alice.did, protocolsConfig.message); + expect(protocolsConfigureReply.status.code).to.equal(202); + + + // Alice writes a 'thread' record + const threadRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + protocol : protocolDefinition.protocol, + protocolPath : 'thread', + }); + const threadRoleReply = await dwn.processMessage(alice.did, threadRecord.message, { dataStream: threadRecord.dataStream }); + expect(threadRoleReply.status.code).to.equal(202); + + const filter: RecordsFilter = { + protocol : protocolDefinition.protocol, + protocolPath : 'thread/chat', + contextId : threadRecord.message.contextId, + }; + + const noRoleRecords: string[] = []; + const addNoRole = async (message: GenericMessage): Promise => { + if (message.descriptor.interface === DwnInterfaceName.Records && message.descriptor.method === DwnMethodName.Write) { + const recordsWriteMessage = message as RecordsWriteMessage; + noRoleRecords.push(recordsWriteMessage.recordId); + } + }; + + // subscribe without role, expect no messages + const noRoleSubscription = await TestDataGenerator.generateRecordsSubscribe({ + author: bob, + filter + }); + + const subscriptionReply = await dwn.processMessage(alice.did, noRoleSubscription.message, { subscriptionHandler: addNoRole }); + expect(subscriptionReply.status.code).to.equal(200); + expect(subscriptionReply.subscription).to.exist; + + // Alice writes a 'participant' $contextRole record with Bob as recipient + const participantRoleRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : bob.did, + protocol : protocolDefinition.protocol, + protocolPath : 'thread/participant', + contextId : threadRecord.message.contextId, + parentId : threadRecord.message.recordId, + data : new TextEncoder().encode('Bob is my friend'), + }); + const participantRoleReply = + await dwn.processMessage(alice.did, participantRoleRecord.message, { dataStream: participantRoleRecord.dataStream }); + expect(participantRoleReply.status.code).to.equal(202); + + const recordIds: string[] = []; + const addRecord:RecordSubscriptionHandler = async (message) => { + if (message.descriptor.method === DwnMethodName.Write) { + const recordsWriteMessage = message as RecordsWriteMessage; + recordIds.push(recordsWriteMessage.recordId); + } + }; + + // subscribe with the participant role + const bobSubscriptionWithRole = await TestDataGenerator.generateRecordsSubscribe({ + filter, + author : bob, + protocolRole : 'thread/participant', + }); + + const subscriptionWithRoleReply = await dwn.processMessage(alice.did, bobSubscriptionWithRole.message, { subscriptionHandler: addRecord }); + expect(subscriptionWithRoleReply.status.code).to.equal(200); + expect(subscriptionWithRoleReply.subscription).to.exist; + + + // Alice writes three 'chat' records + const chatRecordIds = []; + for (let i = 0; i < 3; i++) { + const chatRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : alice.did, + protocol : protocolDefinition.protocol, + protocolPath : 'thread/chat', + published : false, + contextId : threadRecord.message.contextId, + parentId : threadRecord.message.recordId, + data : new TextEncoder().encode('Bob can read this cuz he is my friend'), + }); + const chatReply = await dwn.processMessage(alice.did, chatRecord.message, { dataStream: chatRecord.dataStream }); + expect(chatReply.status.code).to.equal(202); + chatRecordIds.push(chatRecord.message.recordId); + } + + // there should not be any messages in the subscription without a participant role. + expect(noRoleRecords.length).to.equal(0); + + // should have all chat messages. + expect(recordIds).to.have.members(chatRecordIds); + }); + + it('does not execute protocol subscriptions where protocolPath is missing from the filter', async () => { + // scenario: Alice writes some chat messages. Bob invokes his $globalRole to subscribe those messages, + // but his subscription filter does not include protocolPath. + + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + const protocolDefinition = friendRoleProtocolDefinition; + + const protocolsConfig = await TestDataGenerator.generateProtocolsConfigure({ + author: alice, + protocolDefinition + }); + const protocolsConfigureReply = await dwn.processMessage(alice.did, protocolsConfig.message); + expect(protocolsConfigureReply.status.code).to.equal(202); + + // Alice writes a 'friend' $globalRole record with Bob as recipient + const friendRoleRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : bob.did, + protocol : protocolDefinition.protocol, + protocolPath : 'friend', + data : new TextEncoder().encode('Bob is my friend'), + }); + const friendRoleReply = await dwn.processMessage(alice.did, friendRoleRecord.message, { dataStream: friendRoleRecord.dataStream }); + expect(friendRoleReply.status.code).to.equal(202); + + // Bob invokes his friendRole to subscribe but does not have `protocolPath` in the filter + const chatSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : bob, + filter : { + protocol: protocolDefinition.protocol, + // protocolPath deliberately omitted + }, + protocolRole: 'friend', + }); + const chatSubscribeReply = await dwn.processMessage(alice.did, chatSubscribe.message); + expect(chatSubscribeReply.status.code).to.equal(400); + expect(chatSubscribeReply.status.detail).to.contain(DwnErrorCode.RecordsSubscribeFilterMissingRequiredProperties); + expect(chatSubscribeReply.subscription).to.not.exist; + }); + + it('does not execute $contextRole authorized subscriptions where contextId is missing from the filter', async () => { + // scenario: Alice gives Bob a role allowing him to access a particular chat thread. + // But Bob's filter does not contain a contextId so the subscription fails. + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + const protocolDefinition = threadRoleProtocolDefinition; + + const protocolsConfig = await TestDataGenerator.generateProtocolsConfigure({ + author: alice, + protocolDefinition + }); + const protocolsConfigureReply = await dwn.processMessage(alice.did, protocolsConfig.message); + expect(protocolsConfigureReply.status.code).to.equal(202); + + // Alice writes a 'thread' record + const threadRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + protocol : protocolDefinition.protocol, + protocolPath : 'thread', + }); + const threadRoleReply = await dwn.processMessage(alice.did, threadRecord.message, { dataStream: threadRecord.dataStream }); + expect(threadRoleReply.status.code).to.equal(202); + + // Alice writes a 'friend' $globalRole record with Bob as recipient + const participantRoleRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : bob.did, + protocol : protocolDefinition.protocol, + protocolPath : 'thread/participant', + contextId : threadRecord.message.contextId, + parentId : threadRecord.message.recordId, + data : new TextEncoder().encode('Bob is my friend'), + }); + const participantRoleReply = + await dwn.processMessage(alice.did, participantRoleRecord.message, { dataStream: participantRoleRecord.dataStream }); + expect(participantRoleReply.status.code).to.equal(202); + + // Bob invokes his thread participant role to subscribe but omits the contextId + const chatSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : bob, + filter : { + protocol : protocolDefinition.protocol, + protocolPath : 'thread/chat', + // contextId deliberately omitted + }, + protocolRole: 'thread/participant', + }); + const chatSubscribeReply = await dwn.processMessage(alice.did, chatSubscribe.message); + expect(chatSubscribeReply.status.code).to.eq(401); + expect(chatSubscribeReply.status.detail).to.contain(DwnErrorCode.ProtocolAuthorizationMissingContextId); + expect(chatSubscribeReply.subscription).to.not.exist; + }); + + it('rejects $globalRole authorized subscriptions if the request author does not have a matching $globalRole', async () => { + // scenario: Alice installs a chat protocol. + // Bob invokes a $globalRole within that protocol to subscribe but fails because he does not actually have a role. + + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + const protocolDefinition = friendRoleProtocolDefinition; + + const protocolsConfig = await TestDataGenerator.generateProtocolsConfigure({ + author: alice, + protocolDefinition + }); + const protocolsConfigureReply = await dwn.processMessage(alice.did, protocolsConfig.message); + expect(protocolsConfigureReply.status.code).to.equal(202); + + // Bob invokes a friendRole he does not have to subscribe to the records + const chatSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : bob, + filter : { + protocol : protocolDefinition.protocol, + protocolPath : 'chat', + }, + protocolRole: 'friend', + }); + const chatSubscribeReply = await dwn.processMessage(alice.did, chatSubscribe.message); + expect(chatSubscribeReply.status.code).to.eq(401); + expect(chatSubscribeReply.status.detail).to.contain(DwnErrorCode.ProtocolAuthorizationMissingRole); + expect(chatSubscribeReply.subscription).to.not.exist; + }); + + it('rejects protocol authorized subscriptions where the subscription author does not have a matching $contextRole', async () => { + + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + const protocolDefinition = threadRoleProtocolDefinition; + + const protocolsConfig = await TestDataGenerator.generateProtocolsConfigure({ + author: alice, + protocolDefinition + }); + const protocolsConfigureReply = await dwn.processMessage(alice.did, protocolsConfig.message); + expect(protocolsConfigureReply.status.code).to.equal(202); + + // Alice writes a 'thread' record + const threadRecord = await TestDataGenerator.generateRecordsWrite({ + author : alice, + protocol : protocolDefinition.protocol, + protocolPath : 'thread', + }); + const threadRoleReply = await dwn.processMessage(alice.did, threadRecord.message, { dataStream: threadRecord.dataStream }); + expect(threadRoleReply.status.code).to.equal(202); + + // Bob invokes his a `thread/participant` role which he does not have to subscribe to the records + const chatSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : bob, + filter : { + protocol : protocolDefinition.protocol, + protocolPath : 'thread/chat', + contextId : threadRecord.message.contextId, + }, + protocolRole: 'thread/participant', + }); + const chatSubscribeReply = await dwn.processMessage(alice.did, chatSubscribe.message); + expect(chatSubscribeReply.status.code).to.eq(401); + expect(chatSubscribeReply.status.detail).to.contain(DwnErrorCode.ProtocolAuthorizationMissingRole); + expect(chatSubscribeReply.subscription).to.not.exist; + }); + }); + }); + }); +} \ No newline at end of file diff --git a/tests/handlers/records-write.spec.ts b/tests/handlers/records-write.spec.ts index b5d7509a6..4f276fea6 100644 --- a/tests/handlers/records-write.spec.ts +++ b/tests/handlers/records-write.spec.ts @@ -69,7 +69,6 @@ export function testRecordsWriteHandler(): void { messageStore = stores.messageStore; dataStore = stores.dataStore; eventLog = stores.eventLog; - eventStream = TestEventStream.get(); dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog, eventStream }); diff --git a/tests/interfaces/records-subscribe.spec.ts b/tests/interfaces/records-subscribe.spec.ts new file mode 100644 index 000000000..241d74e36 --- /dev/null +++ b/tests/interfaces/records-subscribe.spec.ts @@ -0,0 +1,79 @@ +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +import dexProtocolDefinition from '../vectors/protocol-definitions/dex.json' assert { type: 'json' }; +import { Jws } from '../../src/utils/jws.js'; +import { RecordsSubscribe } from '../../src/interfaces/records-subscribe.js'; +import { TestDataGenerator } from '../utils/test-data-generator.js'; +import { Time } from '../../src/utils/time.js'; + +chai.use(chaiAsPromised); + +describe('RecordsSubscribe', () => { + describe('create()', () => { + it('should not allow published to be set to false with a datePublished filter also set', async () => { + // test control + const randomDate = TestDataGenerator.randomTimestamp(); + const recordQueryControl = TestDataGenerator.generateRecordsQuery({ + filter: { datePublished: { from: randomDate, }, published: true } + }); + + await expect(recordQueryControl).to.eventually.not.be.rejected; + + const recordQueryRejected = TestDataGenerator.generateRecordsQuery({ + filter: { datePublished: { from: randomDate }, published: false } + }); + await expect(recordQueryRejected).to.eventually.be.rejectedWith('descriptor/filter/published: must be equal to one of the allowed values'); + }); + + it('should use `messageTimestamp` as is if given', async () => { + const alice = await TestDataGenerator.generatePersona(); + + const currentTime = Time.getCurrentTimestamp(); + const recordsQuery = await RecordsSubscribe.create({ + filter : { schema: 'anything' }, + messageTimestamp : currentTime, + signer : Jws.createSigner(alice), + }); + + expect(recordsQuery.message.descriptor.messageTimestamp).to.equal(currentTime); + }); + + it('should auto-normalize protocol URL', async () => { + const alice = await TestDataGenerator.generatePersona(); + + const options = { + recipient : alice.did, + data : TestDataGenerator.randomBytes(10), + dataFormat : 'application/json', + signer : Jws.createSigner(alice), + filter : { protocol: 'example.com/' }, + definition : dexProtocolDefinition + }; + const recordsQuery = await RecordsSubscribe.create(options); + + const message = recordsQuery.message; + + expect(message.descriptor.filter!.protocol).to.eq('http://example.com'); + }); + + it('should auto-normalize schema URL', async () => { + const alice = await TestDataGenerator.generatePersona(); + + const options = { + recipient : alice.did, + data : TestDataGenerator.randomBytes(10), + dataFormat : 'application/json', + signer : Jws.createSigner(alice), + filter : { schema: 'example.com/' }, + definition : dexProtocolDefinition + }; + const recordsQuery = await RecordsSubscribe.create(options); + + const message = recordsQuery.message; + + expect(message.descriptor.filter!.schema).to.eq('http://example.com'); + }); + }); +}); + diff --git a/tests/scenarios/delegated-grant.spec.ts b/tests/scenarios/delegated-grant.spec.ts index 74e533db7..e2104fc14 100644 --- a/tests/scenarios/delegated-grant.spec.ts +++ b/tests/scenarios/delegated-grant.spec.ts @@ -1,5 +1,5 @@ import type { EventStream } from '../../src/types/subscriptions.js'; -import type { DataStore, EventLog, MessageStore, PermissionScope } from '../../src/index.js'; +import type { DataStore, EventLog, MessageStore, PermissionScope, RecordsDeleteMessage, RecordsWriteMessage } from '../../src/index.js'; import chaiAsPromised from 'chai-as-promised'; import emailProtocolDefinition from '../vectors/protocol-definitions/email.json' assert { type: 'json' }; @@ -21,7 +21,7 @@ import { TestEventStream } from '../test-event-stream.js'; import { TestStores } from '../test-stores.js'; import { Time } from '../../src/utils/time.js'; -import { DwnInterfaceName, DwnMethodName, PermissionsGrant, RecordsDelete, RecordsQuery, RecordsRead } from '../../src/index.js'; +import { DwnInterfaceName, DwnMethodName, PermissionsGrant, RecordsDelete, RecordsQuery, RecordsRead, RecordsSubscribe } from '../../src/index.js'; chai.use(chaiAsPromised); @@ -43,7 +43,6 @@ export function testDelegatedGrantScenarios(): void { messageStore = stores.messageStore; dataStore = stores.dataStore; eventLog = stores.eventLog; - eventStream = TestEventStream.get(); dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog, eventStream }); @@ -348,6 +347,125 @@ export function testDelegatedGrantScenarios(): void { expect(recordsQueryByCarolReply.status.detail).to.contain(DwnErrorCode.RecordsValidateIntegrityGrantedToAndSignerMismatch); }); + it('should only allow correct entity invoking a delegated grant to subscribe', async () => { + // scenario: + // 1. Bob installs a chat protocol and creates a thread, adding Alice as a participant. + // 2. Alice a creates subscribe delegated grant for device X, + // 3. deviceX creates a subscription to receive events. + // 4. Carol should not be able to read the chat using deviceX's delegated grant. + // 5. Bob writes a chat to the thread. + // 6. The subscription should have received the chat. + + const alice = await DidKeyResolver.generate(); + const deviceX = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + const carol = await DidKeyResolver.generate(); + + // Bob has the chat protocol installed + const protocolDefinition = threadRoleProtocolDefinition; + const protocol = threadRoleProtocolDefinition.protocol; + const protocolsConfig = await TestDataGenerator.generateProtocolsConfigure({ + author: bob, + protocolDefinition + }); + const protocolsConfigureReply = await dwn.processMessage(bob.did, protocolsConfig.message); + expect(protocolsConfigureReply.status.code).to.equal(202); + + // Bob starts a chat thread + const threadRecord = await TestDataGenerator.generateRecordsWrite({ + author : bob, + protocol : protocolDefinition.protocol, + protocolPath : 'thread', + }); + const threadRoleReply = await dwn.processMessage(bob.did, threadRecord.message, { dataStream: threadRecord.dataStream }); + expect(threadRoleReply.status.code).to.equal(202); + + // Bob adds Alice as a participant in the thread + const participantRoleRecord = await TestDataGenerator.generateRecordsWrite({ + author : bob, + recipient : alice.did, + protocol : protocolDefinition.protocol, + protocolPath : 'thread/participant', + contextId : threadRecord.message.contextId, + parentId : threadRecord.message.recordId, + data : new TextEncoder().encode('Alice is my friend'), + }); + const participantRoleReply = await dwn.processMessage(bob.did, participantRoleRecord.message, { dataStream: participantRoleRecord.dataStream }); + expect(participantRoleReply.status.code).to.equal(202); + + // Alice creates a delegated subscribe grant for device X to act as Alice. + const subscribeGrantForDeviceX = await PermissionsGrant.create({ + delegated : true, // this is a delegated grant + dateExpires : Time.createOffsetTimestamp({ seconds: 100 }), + grantedBy : alice.did, + grantedTo : deviceX.did, + grantedFor : alice.did, + scope : { + interface : DwnInterfaceName.Records, + method : DwnMethodName.Subscribe, + protocol + }, + signer: Jws.createSigner(alice) + }); + + const subscriptionChatRecords:Set = new Set(); + const captureChatRecords = async (message: RecordsWriteMessage | RecordsDeleteMessage): Promise => { + if (message.descriptor.method === DwnMethodName.Delete) { + const recordId = message.descriptor.recordId; + subscriptionChatRecords.delete(recordId); + } else { + const recordId = (message as RecordsWriteMessage).recordId; + subscriptionChatRecords.add(recordId); + } + }; + + // verify device X is able to subscribe the chat message from Bob's DWN + const recordsSubscribeByDeviceX = await RecordsSubscribe.create({ + signer : Jws.createSigner(deviceX), + delegatedGrant : subscribeGrantForDeviceX.asDelegatedGrant(), + protocolRole : 'thread/participant', + filter : { + contextId : threadRecord.message.contextId, + protocol : protocolDefinition.protocol, + protocolPath : 'thread/chat' + } + }); + const recordsSubscribeByDeviceXReply = await dwn.processMessage(bob.did, recordsSubscribeByDeviceX.message, { + subscriptionHandler: captureChatRecords + }); + expect(recordsSubscribeByDeviceXReply.status.code).to.equal(200, 'subscribe'); + + // Verify that Carol cannot subscribe as Alice by invoking the delegated grant granted to Device X + const recordsSubscribeByCarol = await RecordsSubscribe.create({ + signer : Jws.createSigner(carol), + delegatedGrant : subscribeGrantForDeviceX.asDelegatedGrant(), + protocolRole : 'thread/participant', + filter : { + contextId : threadRecord.message.contextId, + protocol : protocolDefinition.protocol, + protocolPath : 'thread/chat' + } + }); + const recordsSubscribeByCarolReply = await dwn.processMessage(bob.did, recordsSubscribeByCarol.message); + expect(recordsSubscribeByCarolReply.status.code).to.equal(400, 'carol subscribe'); + expect(recordsSubscribeByCarolReply.status.detail).to.contain(DwnErrorCode.RecordsValidateIntegrityGrantedToAndSignerMismatch); + + // Bob writes a chat message in the thread + const chatRecord = await TestDataGenerator.generateRecordsWrite({ + author : bob, + protocol : protocolDefinition.protocol, + protocolPath : 'thread/chat', + contextId : threadRecord.message.contextId, + parentId : threadRecord.message.recordId, + }); + const chatRecordReply = await dwn.processMessage(bob.did, chatRecord.message, { dataStream: chatRecord.dataStream }); + expect(chatRecordReply.status.code).to.equal(202); + + await recordsSubscribeByDeviceXReply.subscription?.close(); + expect(subscriptionChatRecords.size).to.equal(1); + expect([...subscriptionChatRecords]).to.have.members([chatRecord.message.recordId]); + }); + it('should only allow correct entity invoking a delegated grant to delete', async () => { // scenario: // 1. Bob installs the chat protocol on his DWN and makes Alice an admin @@ -532,12 +650,12 @@ export function testDelegatedGrantScenarios(): void { expect(deviceXWriteReply.status.detail).to.contain(DwnErrorCode.RecordsGrantAuthorizationScopeProtocolMismatch); }); - it('should fail if delegated grant has a mismatching protocol scope - query & read', async () => { + it('should fail if delegated grant has a mismatching protocol scope - query, subscribe & read', async () => { // scenario: // 1. Alice creates a delegated grant for device X to act as her for a protocol that is NOT chat protocol // 2. Bob starts a chat thread with Alice on his DWN - // 3. Device X attempts to use the delegated grant to read the chat thread - // 4. Bob's DWN should reject Device X's read attempt + // 3. Device X attempts to use the delegated grant to read, query and subscribe to the chat thread. + // 4. Bob's DWN should reject Device X's read query or subscribe attempts const alice = await DidKeyResolver.generate(); const deviceX = await DidKeyResolver.generate(); @@ -575,6 +693,36 @@ export function testDelegatedGrantScenarios(): void { const participantRoleReply = await dwn.processMessage(bob.did, participantRoleRecord.message, { dataStream: participantRoleRecord.dataStream }); expect(participantRoleReply.status.code).to.equal(202); + // Alice creates a delegated subscribe grant for device X to act as Alice but not for chat protocol + const subscribeGrantForDeviceX = await PermissionsGrant.create({ + delegated : true, // this is a delegated grant + dateExpires : Time.createOffsetTimestamp({ seconds: 100 }), + grantedBy : alice.did, + grantedTo : deviceX.did, + grantedFor : alice.did, + scope : { + interface : DwnInterfaceName.Records, + method : DwnMethodName.Subscribe, + protocol : 'some-protocol' + }, + signer: Jws.createSigner(alice) + }); + + // verify device X subscribing to the chat message from Bob's DWN fails + const recordsSubscribeByDeviceX = await RecordsSubscribe.create({ + signer : Jws.createSigner(deviceX), + delegatedGrant : subscribeGrantForDeviceX.asDelegatedGrant(), + protocolRole : 'thread/participant', + filter : { + protocol, + contextId : threadRecord.message.contextId, + protocolPath : 'thread/chat' + } + }); + const deviceXRecordsSubscribeReply = await dwn.processMessage(bob.did, recordsSubscribeByDeviceX.message); + expect(deviceXRecordsSubscribeReply.status.code).to.equal(401); + expect(deviceXRecordsSubscribeReply.status.detail).to.contain(DwnErrorCode.RecordsGrantAuthorizationQueryOrSubscribeProtocolScopeMismatch); + // Bob writes a chat message in the thread const chatRecord = await TestDataGenerator.generateRecordsWrite({ author : bob, @@ -629,7 +777,7 @@ export function testDelegatedGrantScenarios(): void { }); const deviceXRecordsQueryReply = await dwn.processMessage(bob.did, recordsQueryByDeviceX.message); expect(deviceXRecordsQueryReply.status.code).to.equal(401); - expect(deviceXRecordsQueryReply.status.detail).to.contain(DwnErrorCode.RecordsGrantAuthorizationQueryProtocolScopeMismatch); + expect(deviceXRecordsQueryReply.status.detail).to.contain(DwnErrorCode.RecordsGrantAuthorizationQueryOrSubscribeProtocolScopeMismatch); // verify device X reading for the chat message from Bob's DWN fails const recordsReadByDeviceX = await RecordsRead.create({ @@ -641,9 +789,9 @@ export function testDelegatedGrantScenarios(): void { } }); - const deviceXWriteReply = await dwn.processMessage(bob.did, recordsReadByDeviceX.message); - expect(deviceXWriteReply.status.code).to.equal(401); - expect(deviceXWriteReply.status.detail).to.contain(DwnErrorCode.RecordsGrantAuthorizationScopeProtocolMismatch); + const deviceXReadReply = await dwn.processMessage(bob.did, recordsReadByDeviceX.message); + expect(deviceXReadReply.status.code).to.equal(401); + expect(deviceXReadReply.status.detail).to.contain(DwnErrorCode.RecordsGrantAuthorizationScopeProtocolMismatch); }); it('should fail if delegated grant has a mismatching protocol scope - delete', async () => { diff --git a/tests/scenarios/end-to-end-tests.spec.ts b/tests/scenarios/end-to-end-tests.spec.ts index 7d2dd27f8..7cd1b24ff 100644 --- a/tests/scenarios/end-to-end-tests.spec.ts +++ b/tests/scenarios/end-to-end-tests.spec.ts @@ -39,7 +39,6 @@ export function testEndToEndScenarios(): void { messageStore = stores.messageStore; dataStore = stores.dataStore; eventLog = stores.eventLog; - eventStream = TestEventStream.get(); dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog, eventStream }); diff --git a/tests/scenarios/subscriptions.spec.ts b/tests/scenarios/subscriptions.spec.ts index dae65cfb3..3137dbfae 100644 --- a/tests/scenarios/subscriptions.spec.ts +++ b/tests/scenarios/subscriptions.spec.ts @@ -924,5 +924,288 @@ export function testSubscriptionScenarios(): void { expect(messageCids).to.eql([ record1MessageCid ]); }); }); + + describe('records subscribe', () => { + it('allows for anonymous subscriptions to published records', async () => { + const alice = await DidKeyResolver.generate(); + + const messages:string[] = []; + const subscriptionHandler = async (message:GenericMessage):Promise => { + messages.push(await Message.getCid(message)); + }; + + const anonymousSubscription = await TestDataGenerator.generateRecordsSubscribe({ + anonymous : true, + filter : { schema: 'http://schema1' } + }); + + const anonymousSubscriptionReply = await dwn.processMessage(alice.did, anonymousSubscription.message, { + subscriptionHandler + }); + expect(anonymousSubscriptionReply.status.code).to.equal(200); + expect(anonymousSubscriptionReply.subscription).to.exist; + + const write1 = await TestDataGenerator.generateRecordsWrite({ author: alice, schema: 'http://schema1', published: true }); + const write1Reply = await dwn.processMessage(alice.did, write1.message, { dataStream: write1.dataStream }); + expect(write1Reply.status.code).to.equal(202); + + const write2 = await TestDataGenerator.generateRecordsWrite({ author: alice, schema: 'http://schema1', published: true }); + const write2Reply = await dwn.processMessage(alice.did, write2.message, { dataStream: write2.dataStream }); + expect(write2Reply.status.code).to.equal(202); + + // will not be emitted as it is not explicitly published + const writeNotPublished = await TestDataGenerator.generateRecordsWrite({ author: alice, schema: 'http://schema1' }); + const writeNotPublishedReply = await dwn.processMessage(alice.did, writeNotPublished.message, { dataStream: writeNotPublished.dataStream }); + expect(writeNotPublishedReply.status.code).to.equal(202); + + // await for handler to receive and process the message + await Time.minimalSleep(); + + expect(messages.length).to.equal(2); + expect(messages).to.have.members([ + await Message.getCid(write1.message), + await Message.getCid(write2.message), + ]); + }); + + it('allows authorized subscriptions to records intended for a recipient', async () => { + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + const carol = await DidKeyResolver.generate(); + + // bob subscribes to any messages he's authorized to see + const bobMessages:string[] = []; + const bobSubscribeHandler = async (message:GenericMessage):Promise => { + bobMessages.push(await Message.getCid(message)); + }; + + const bobSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : bob, + filter : { schema: 'http://schema1' } + }); + + const bobSubscribeReply = await dwn.processMessage(alice.did, bobSubscribe.message, { + subscriptionHandler: bobSubscribeHandler + }); + expect(bobSubscribeReply.status.code).to.equal(200); + expect(bobSubscribeReply.subscription).to.exist; + + // carol subscribes to any messages she's the recipient of. + const carolMessages:string[] = []; + const carolSubscribeHandler = async (message:GenericMessage):Promise => { + carolMessages.push(await Message.getCid(message)); + }; + + const carolSubscribe = await TestDataGenerator.generateRecordsSubscribe({ + author : carol, + filter : { schema: 'http://schema1', recipient: carol.did } + }); + + const carolSubscribeReply = await dwn.processMessage(alice.did, carolSubscribe.message, { + subscriptionHandler: carolSubscribeHandler + }); + expect(carolSubscribeReply.status.code).to.equal(200); + expect(carolSubscribeReply.subscription).to.exist; + + const write1 = await TestDataGenerator.generateRecordsWrite({ author: alice, schema: 'http://schema1', recipient: bob.did }); + const write1Reply = await dwn.processMessage(alice.did, write1.message, { dataStream: write1.dataStream }); + expect(write1Reply.status.code).to.equal(202); + + const write2 = await TestDataGenerator.generateRecordsWrite({ author: alice, schema: 'http://schema1', recipient: bob.did }); + const write2Reply = await dwn.processMessage(alice.did, write2.message, { dataStream: write2.dataStream }); + expect(write2Reply.status.code).to.equal(202); + + // message for carol only + const writeForCarol = await TestDataGenerator.generateRecordsWrite({ author: alice, schema: 'http://schema1', recipient: carol.did }); + const writeForCarolReply = await dwn.processMessage(alice.did, writeForCarol.message, { dataStream: writeForCarol.dataStream }); + expect(writeForCarolReply.status.code).to.equal(202); + + // await for handler to receive and process the message + await Time.minimalSleep(); + + expect(bobMessages.length).to.equal(2); + expect(bobMessages).to.have.members([ + await Message.getCid(write1.message), + await Message.getCid(write2.message), + ]); + + expect(carolMessages.length).to.equal(1); + expect(carolMessages).to.have.members([ + await Message.getCid(writeForCarol.message), + ]); + }); + + it('filters by protocol & parentId across multiple protocolPaths', async () => { + // scenario: subscribe to multiple protocolPaths for a given protocol and parentId + // alice installs a protocol and creates a thread + // alice subscribes to update to that thread, it's participant as well as thread chats + // alice adds bob and carol as participants to the thread + // alice, bob, and carol all create messages + // alice deletes carol participant message + // alice checks that the correct messages were omitted + + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + const carol = await DidKeyResolver.generate(); + + // create protocol + const protocolConfigure = await TestDataGenerator.generateProtocolsConfigure({ + author : alice, + protocolDefinition : { ...threadProtocol } + }); + const protocolConfigureReply = await dwn.processMessage(alice.did, protocolConfigure.message); + expect(protocolConfigureReply.status.code).to.equal(202); + const protocol = protocolConfigure.message.descriptor.definition.protocol; + + // alice creates thread + const thread = await TestDataGenerator.generateRecordsWrite({ + author : alice, + protocol : protocol, + protocolPath : 'thread' + }); + const threadReply = await dwn.processMessage(alice.did, thread.message, { dataStream: thread.dataStream }); + expect(threadReply.status.code).to.equal(202); + + + // subscribe to this thread's events + const messages:string[] = []; + const subscriptionHandler = async (message:GenericMessage):Promise => { + messages.push(await Message.getCid(message)); + }; + + const threadSubscription = await TestDataGenerator.generateRecordsSubscribe({ + author : alice, + filter : { protocol: protocol, protocolPath: 'thread', parentId: thread.message.recordId }, // thread updates + }); + const threadSubscriptionReply = await dwn.processMessage(alice.did, threadSubscription.message, { + subscriptionHandler + }); + expect(threadSubscriptionReply.status.code).to.equal(200); + expect(threadSubscriptionReply.subscription).to.exist; + + const participantSubscription = await TestDataGenerator.generateRecordsSubscribe({ + author : alice, + filter : { protocol: protocol, protocolPath: 'thread/participant', parentId: thread.message.recordId }, // participant updates + }); + const participantSubscriptionReply = await dwn.processMessage(alice.did, participantSubscription.message, { + subscriptionHandler + }); + expect(participantSubscriptionReply.status.code).to.equal(200); + expect(participantSubscriptionReply.subscription).to.exist; + + const chatSubscription = await TestDataGenerator.generateRecordsSubscribe({ + author : alice, + filter : { protocol: protocol, protocolPath: 'thread/chat', parentId: thread.message.recordId } // chat updates + }); + const chatSubscriptionReply = await dwn.processMessage(alice.did, chatSubscription.message, { + subscriptionHandler + }); + expect(chatSubscriptionReply.status.code).to.equal(200); + expect(chatSubscriptionReply.subscription).to.exist; + + // add bob as participant + const bobParticipant = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : bob.did, + parentId : thread.message.recordId, + contextId : thread.message.contextId, + protocol : protocol, + protocolPath : 'thread/participant' + }); + const bobParticipantReply = await dwn.processMessage(alice.did, bobParticipant.message, { dataStream: bobParticipant.dataStream }); + expect(bobParticipantReply.status.code).to.equal(202); + + // add carol as participant + const carolParticipant = await TestDataGenerator.generateRecordsWrite({ + author : alice, + recipient : carol.did, + parentId : thread.message.recordId, + contextId : thread.message.contextId, + protocol : protocol, + protocolPath : 'thread/participant' + }); + const carolParticipantReply = await dwn.processMessage(alice.did, carolParticipant.message, { dataStream: carolParticipant.dataStream }); + expect(carolParticipantReply.status.code).to.equal(202); + + // add another thread as a control, will not show up in handled events + const additionalThread = await TestDataGenerator.generateRecordsWrite({ + author : alice, + protocol : protocol, + protocolPath : 'thread' + }); + const additionalThreadReply = await dwn.processMessage(alice.did, additionalThread.message, { dataStream: additionalThread.dataStream }); + expect(additionalThreadReply.status.code).to.equal(202); + + // sleep to allow all messages to be processed by the handler message + await Time.minimalSleep(); + + expect(messages.length).to.equal(2); + expect(messages).to.have.members([ + await Message.getCid(bobParticipant.message), + await Message.getCid(carolParticipant.message), + ]); + + // add a message to protocol1 + const message1 = await TestDataGenerator.generateRecordsWrite({ + author : bob, + recipient : alice.did, + parentId : thread.message.recordId, + contextId : thread.message.contextId, + protocol : protocol, + protocolPath : 'thread/chat', + protocolRole : 'thread/participant', + }); + const message1Reply = await dwn.processMessage(alice.did, message1.message, { dataStream: message1.dataStream }); + expect(message1Reply.status.code).to.equal(202); + + const message2 = await TestDataGenerator.generateRecordsWrite({ + author : bob, + recipient : alice.did, + parentId : thread.message.recordId, + contextId : thread.message.contextId, + protocol : protocol, + protocolPath : 'thread/chat', + protocolRole : 'thread/participant', + }); + const message2Reply = await dwn.processMessage(alice.did, message2.message, { dataStream: message2.dataStream }); + expect(message2Reply.status.code).to.equal(202); + + const message3 = await TestDataGenerator.generateRecordsWrite({ + author : carol, + recipient : alice.did, + parentId : thread.message.recordId, + contextId : thread.message.contextId, + protocol : protocol, + protocolPath : 'thread/chat', + protocolRole : 'thread/participant', + }); + const message3Reply = await dwn.processMessage(alice.did, message3.message, { dataStream: message3.dataStream }); + expect(message3Reply.status.code).to.equal(202); + + // sleep in order to allow messages to process and check for the added messages + await Time.minimalSleep(); + expect(messages.length).to.equal(5); + expect(messages).to.include.members([ + await Message.getCid(message1.message), + await Message.getCid(message2.message), + await Message.getCid(message3.message), + ]); + + // delete carol participant + const deleteCarol = await TestDataGenerator.generateRecordsDelete({ + author : alice, + recordId : carolParticipant.message.recordId + }); + const deleteCarolReply = await dwn.processMessage(alice.did, deleteCarol.message); + expect(deleteCarolReply.status.code).to.equal(202); + + // sleep in order to allow messages to process and check for the delete message + await Time.minimalSleep(); + expect(messages.length).to.equal(6); + expect(messages).to.include.members([ + await Message.getCid(deleteCarol.message) + ]); + }); + }); }); } \ No newline at end of file diff --git a/tests/test-suite.ts b/tests/test-suite.ts index 01e456b99..1e2d51cb3 100644 --- a/tests/test-suite.ts +++ b/tests/test-suite.ts @@ -17,6 +17,7 @@ import { testProtocolsQueryHandler } from './handlers/protocols-query.spec.js'; import { testRecordsDeleteHandler } from './handlers/records-delete.spec.js'; import { testRecordsQueryHandler } from './handlers/records-query.spec.js'; import { testRecordsReadHandler } from './handlers/records-read.spec.js'; +import { testRecordsSubscribeHandler } from './handlers/records-subscribe.spec.js'; import { testRecordsWriteHandler } from './handlers/records-write.spec.js'; import { TestStores } from './test-stores.js'; import { testSubscriptionScenarios } from './scenarios/subscriptions.spec.js'; @@ -52,6 +53,7 @@ export class TestSuite { testRecordsDeleteHandler(); testRecordsQueryHandler(); testRecordsReadHandler(); + testRecordsSubscribeHandler(); testRecordsWriteHandler(); // scenario tests diff --git a/tests/utils/test-data-generator.ts b/tests/utils/test-data-generator.ts index 146877f48..b6a0f690d 100644 --- a/tests/utils/test-data-generator.ts +++ b/tests/utils/test-data-generator.ts @@ -11,7 +11,7 @@ import type { ProtocolsConfigureOptions } from '../../src/interfaces/protocols-c import type { ProtocolsQueryOptions } from '../../src/interfaces/protocols-query.js'; import type { Readable } from 'readable-stream'; import type { RecordsQueryOptions } from '../../src/interfaces/records-query.js'; -import type { RecordsWriteMessage } from '../../src/types/records-types.js'; +import type { RecordsSubscribeOptions } from '../../src/interfaces/records-subscribe.js'; import type { Signer } from '../../src/types/signer.js'; import type { AuthorizationModel, Pagination } from '../../src/types/message-types.js'; import type { CreateFromOptions, EncryptionInput, KeyEncryptionInput, RecordsWriteOptions } from '../../src/interfaces/records-write.js'; @@ -21,6 +21,7 @@ import type { PermissionConditions, PermissionScope } from '../../src/types/perm import type { PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage } from '../../src/types/permissions-types.js'; import type { PrivateJwk, PublicJwk } from '../../src/types/jose-types.js'; import type { ProtocolDefinition, ProtocolsConfigureMessage, ProtocolsQueryMessage } from '../../src/types/protocols-types.js'; +import type { RecordsSubscribeMessage, RecordsWriteMessage } from '../../src/types/records-types.js'; import * as cbor from '@ipld/dag-cbor'; @@ -42,6 +43,7 @@ import { ProtocolsQuery } from '../../src/interfaces/protocols-query.js'; import { Records } from '../../src/utils/records.js'; import { RecordsDelete } from '../../src/interfaces/records-delete.js'; import { RecordsQuery } from '../../src/interfaces/records-query.js'; +import { RecordsSubscribe } from '../../src/interfaces/records-subscribe.js'; import { RecordsWrite } from '../../src/interfaces/records-write.js'; import { removeUndefinedProperties } from '../../src/utils/object.js'; import { Secp256k1 } from '../../src/utils/secp256k1.js'; @@ -166,6 +168,22 @@ export type GenerateRecordsQueryOutput = { message: RecordsQueryMessage; }; +export type GenerateRecordsSubscribeInput = { + /** + * Treated as `false` if not given. + */ + anonymous?: boolean; + author?: Persona; + messageTimestamp?: string; + filter?: RecordsFilter; + protocolRole?: string; +}; + +export type GenerateRecordsSubscribeOutput = { + author: Persona | undefined; + message: RecordsSubscribeMessage; +}; + export type GenerateRecordsDeleteInput = { author?: Persona; recordId?: string; @@ -652,6 +670,44 @@ export class TestDataGenerator { }; }; + /** + * Generates a RecordsSubscribe message for testing. + */ + public static async generateRecordsSubscribe(input?: GenerateRecordsSubscribeInput): Promise { + let author = input?.author; + const anonymous: boolean = input?.anonymous ?? false; + + if (anonymous && author) { + throw new Error('Cannot have `author` and be anonymous at the same time.'); + } + + // generate author if needed + if (author === undefined && !anonymous) { + author = await TestDataGenerator.generatePersona(); + } + + let signer = undefined; + if (author !== undefined) { + signer = Jws.createSigner(author); + } + + const options: RecordsSubscribeOptions = { + messageTimestamp : input?.messageTimestamp, + signer, + filter : input?.filter ?? { schema: TestDataGenerator.randomString(10) }, // must have one filter property if no filter is given + protocolRole : input?.protocolRole, + }; + removeUndefinedProperties(options); + + const recordsSubscribe = await RecordsSubscribe.create(options); + const message = recordsSubscribe.message; + + return { + author, + message + }; + } + /** * Generates a RecordsDelete for testing. */ diff --git a/tests/vectors/protocol-definitions/friend-role.json b/tests/vectors/protocol-definitions/friend-role.json index fc8e650ab..88ac3901c 100644 --- a/tests/vectors/protocol-definitions/friend-role.json +++ b/tests/vectors/protocol-definitions/friend-role.json @@ -34,6 +34,10 @@ "role": "friend", "can": "query" }, + { + "role": "friend", + "can": "subscribe" + }, { "role": "admin", "can": "update" diff --git a/tests/vectors/protocol-definitions/thread-role.json b/tests/vectors/protocol-definitions/thread-role.json index 88ac9842d..ce2a07d72 100644 --- a/tests/vectors/protocol-definitions/thread-role.json +++ b/tests/vectors/protocol-definitions/thread-role.json @@ -49,6 +49,10 @@ "role": "thread/participant", "can": "query" }, + { + "role": "thread/participant", + "can": "subscribe" + }, { "role": "thread/admin", "can": "update"