Skip to content

Commit

Permalink
RecordsSubscribe (#667)
Browse files Browse the repository at this point in the history
This PR builds on top of the existing `EventStream` and adds a subscription specifically for the `Records` interface which follows similar rules as those for `RecordsQuery`.

Currently the subscription does not evict or re-authorize subscriptions after the initial processing of the message. So if a revocation of permissions happens after the subscription is live, the user will continue getting notified until they close the connection.

This will be addressed in a separate PR to keep things more concise and easier to follow. #668
  • Loading branch information
LiranCohen authored Jan 24, 2024
1 parent 4e89df6 commit 8f6e01e
Show file tree
Hide file tree
Showing 36 changed files with 1,805 additions and 104 deletions.
10 changes: 9 additions & 1 deletion Q_AND_A.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

No.

- When making `RecordsQuery` by invoking a protocol role, why is `protocolPath` a required filter property? This means that one cannot filter records under a `protocol` or `contextId` irrespective of the `protocolPath`, thus is forced to make multiple queries (ie. one per `protocolPath`).
- When making `RecordsQuery` or `RecordsSubscribe` by invoking a protocol role, why is `protocolPath` a required filter property? This means that one cannot filter records under a `protocol` or `contextId` irrespective of the `protocolPath`, thus is forced to make multiple queries (ie. one per `protocolPath`).

(Last update: 2023/11/03)

Expand All @@ -79,3 +79,11 @@
- `write` - allows a DID to create and update the record they have created
- `update` - allows a DID to update a record, regardless of the initial author

## Subscriptions
- What happens to a subscription which is listening to events, but is no longer authorized due to revocation of a grant or role?

(Last update: 2024/01/23)

Currently if a subscription is no longer authorized but it is still active, the subscriber will still receive updates until they close the subscription themselves. If they try to re-subscribe after that, it will be rejected with a 401.

This will be addressed in a future upgrade and we've created an issue to track it. https://github.com/TBD54566975/dwn-sdk-js/issues/668 - last updated (2024/01/22)
2 changes: 2 additions & 0 deletions build/compile-validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import RecordsDelete from '../json-schemas/interface-methods/records-delete.json
import RecordsFilter from '../json-schemas/interface-methods/records-filter.json' assert { type: 'json' };
import RecordsQuery from '../json-schemas/interface-methods/records-query.json' assert { type: 'json' };
import RecordsRead from '../json-schemas/interface-methods/records-read.json' assert { type: 'json' };
import RecordsSubscribe from '../json-schemas/interface-methods/records-subscribe.json' assert { type: 'json' };
import RecordsWrite from '../json-schemas/interface-methods/records-write.json' assert { type: 'json' };
import RecordsWriteSignaturePayload from '../json-schemas/signature-payloads/records-write-signature-payload.json' assert { type: 'json' };
import RecordsWriteUnidentified from '../json-schemas/interface-methods/records-write-unidentified.json' assert { type: 'json' };
Expand All @@ -56,6 +57,7 @@ const schemas = {
AuthorizationOwner,
RecordsDelete,
RecordsQuery,
RecordsSubscribe,
RecordsWrite,
RecordsWriteUnidentified,
EventsFilter,
Expand Down
1 change: 1 addition & 0 deletions json-schemas/interface-methods/protocol-rule-set.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"enum": [
"delete",
"query",
"subscribe",
"read",
"update",
"write"
Expand Down
44 changes: 44 additions & 0 deletions json-schemas/interface-methods/records-subscribe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://identity.foundation/dwn/json-schemas/records-subscribe.json",
"type": "object",
"additionalProperties": false,
"required": [
"descriptor"
],
"properties": {
"authorization": {
"$ref": "https://identity.foundation/dwn/json-schemas/authorization-delegated-grant.json"
},
"descriptor": {
"type": "object",
"additionalProperties": false,
"required": [
"interface",
"method",
"messageTimestamp",
"filter"
],
"properties": {
"interface": {
"enum": [
"Records"
],
"type": "string"
},
"method": {
"enum": [
"Subscribe"
],
"type": "string"
},
"messageTimestamp": {
"$ref": "https://identity.foundation/dwn/json-schemas/defs.json#/definitions/date-time"
},
"filter": {
"$ref": "https://identity.foundation/dwn/json-schemas/records-filter.json"
}
}
}
}
}
3 changes: 3 additions & 0 deletions json-schemas/permissions/permissions-definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
},
{
"$ref": "https://identity.foundation/dwn/json-schemas/permissions/scopes.json#/definitions/records-query-scope"
},
{
"$ref": "https://identity.foundation/dwn/json-schemas/permissions/scopes.json#/definitions/records-subscribe-scope"
}
]
},
Expand Down
18 changes: 18 additions & 0 deletions json-schemas/permissions/scopes.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@
"type": "string"
}
}
},
"records-subscribe-scope": {
"type": "object",
"required": [
"interface",
"method"
],
"properties": {
"interface": {
"const": "Records"
},
"method": {
"const": "Subscribe"
},
"protocol": {
"type": "string"
}
}
}
}
}
4 changes: 3 additions & 1 deletion src/core/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export enum DwnErrorCode {
RecordsGrantAuthorizationConditionPublicationProhibited = 'RecordsGrantAuthorizationConditionPublicationProhibited',
RecordsGrantAuthorizationConditionPublicationRequired = 'RecordsGrantAuthorizationConditionPublicationRequired',
RecordsGrantAuthorizationDeleteProtocolScopeMismatch = 'RecordsGrantAuthorizationDeleteProtocolScopeMismatch',
RecordsGrantAuthorizationQueryProtocolScopeMismatch = 'RecordsGrantAuthorizationQueryProtocolScopeMismatch',
RecordsGrantAuthorizationQueryOrSubscribeProtocolScopeMismatch = 'RecordsGrantAuthorizationQueryOrSubscribeProtocolScopeMismatch',
RecordsGrantAuthorizationScopeContextIdMismatch = 'RecordsGrantAuthorizationScopeContextIdMismatch',
RecordsGrantAuthorizationScopeNotProtocol = 'RecordsGrantAuthorizationScopeNotProtocol',
RecordsGrantAuthorizationScopeProtocolMismatch = 'RecordsGrantAuthorizationScopeProtocolMismatch',
Expand All @@ -103,6 +103,8 @@ export enum DwnErrorCode {
RecordsQueryFilterMissingRequiredProperties = 'RecordsQueryFilterMissingRequiredProperties',
RecordsReadReturnedMultiple = 'RecordsReadReturnedMultiple',
RecordsReadAuthorizationFailed = 'RecordsReadAuthorizationFailed',
RecordsSubscribeEventStreamUnimplemented = 'RecordsSubscribeEventStreamUnimplemented',
RecordsSubscribeFilterMissingRequiredProperties = 'RecordsSubscribeFilterMissingRequiredProperties',
RecordsSchemasDerivationSchemeMissingSchema = 'RecordsSchemasDerivationSchemeMissingSchema',
RecordsValidateIntegrityDelegatedGrantAndIdExistenceMismatch = 'RecordsValidateIntegrityDelegatedGrantAndIdExistenceMismatch',
RecordsValidateIntegrityGrantedToAndSignerMismatch = 'RecordsValidateIntegrityGrantedToAndSignerMismatch',
Expand Down
23 changes: 11 additions & 12 deletions src/core/protocol-authorization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { MessageStore } from '../types/message-store.js';
import type { RecordsDelete } from '../interfaces/records-delete.js';
import type { RecordsQuery } from '../interfaces/records-query.js';
import type { RecordsRead } from '../interfaces/records-read.js';
import type { RecordsSubscribe } from '../interfaces/records-subscribe.js';
import type { RecordsWriteMessage } from '../types/records-types.js';
import type { ProtocolActionRule, ProtocolDefinition, ProtocolRuleSet, ProtocolsConfigureMessage, ProtocolType, ProtocolTypes } from '../types/protocols-types.js';

Expand Down Expand Up @@ -152,22 +153,17 @@ export class ProtocolAuthorization {
);
}

/**
* Performs protocol-based authorization against the incoming RecordsQuery message.
* @throws {Error} if authorization fails.
*/
public static async authorizeQuery(
public static async authorizeQueryOrSubscribe(
tenant: string,
incomingMessage: RecordsQuery,
incomingMessage: RecordsQuery | RecordsSubscribe,
messageStore: MessageStore,
): Promise<void> {
// validate that required properties exist in query filter
const { protocol, protocolPath, contextId } = incomingMessage.message.descriptor.filter;

// fetch the protocol definition
const protocolDefinition = await ProtocolAuthorization.fetchProtocolDefinition(
tenant,
protocol!, // authorizeQuery` is only called if `protocol` is present
protocol!, // `authorizeQueryOrSubscribe` is only called if `protocol` is present
messageStore,
);

Expand All @@ -192,7 +188,7 @@ export class ProtocolAuthorization {
tenant,
incomingMessage,
inboundMessageRuleSet,
[], // ancestor chain is not relevant to queries
[], // ancestor chain is not relevant to subscriptions
messageStore,
);
}
Expand Down Expand Up @@ -423,7 +419,7 @@ export class ProtocolAuthorization {
*/
private static async verifyInvokedRole(
tenant: string,
incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsWrite,
incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsSubscribe | RecordsWrite,
protocolUri: string,
contextId: string | undefined,
protocolDefinition: ProtocolDefinition,
Expand Down Expand Up @@ -481,7 +477,7 @@ export class ProtocolAuthorization {
*/
private static async getActionsSeekingARuleMatch(
tenant: string,
incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsWrite,
incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsSubscribe | RecordsWrite,
messageStore: MessageStore,
): Promise<ProtocolAction[]> {

Expand All @@ -495,6 +491,9 @@ export class ProtocolAuthorization {
case DwnMethodName.Read:
return [ProtocolAction.Read];

case DwnMethodName.Subscribe:
return [ProtocolAction.Subscribe];

case DwnMethodName.Write:
const incomingRecordsWrite = incomingMessage as RecordsWrite;
if (await incomingRecordsWrite.isInitialWrite()) {
Expand All @@ -519,7 +518,7 @@ export class ProtocolAuthorization {
*/
private static async verifyAllowedActions(
tenant: string,
incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsWrite,
incomingMessage: RecordsDelete | RecordsQuery | RecordsRead | RecordsSubscribe | RecordsWrite,
inboundMessageRuleSet: ProtocolRuleSet,
ancestorMessageChain: RecordsWriteMessage[],
messageStore: MessageStore,
Expand Down
24 changes: 12 additions & 12 deletions src/core/records-grant-authorization.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { MessageStore } from '../types/message-store.js';
import type { RecordsPermissionScope } from '../types/permissions-grant-descriptor.js';
import type { PermissionsGrantMessage, RecordsPermissionsGrantMessage } from '../types/permissions-types.js';
import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsReadMessage, RecordsWriteMessage } from '../types/records-types.js';
import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsReadMessage, RecordsSubscribeMessage, RecordsWriteMessage } from '../types/records-types.js';

import { GrantAuthorization } from './grant-authorization.js';
import { PermissionsConditionPublication } from '../types/permissions-grant-descriptor.js';
Expand Down Expand Up @@ -63,35 +63,35 @@ export class RecordsGrantAuthorization {
}

/**
* Authorizes the scope of a PermissionsGrant for RecordsQuery.
* Authorizes the scope of a PermissionsGrant for RecordsQuery or RecordsSubscribe.
* @param messageStore Used to check if the grant has been revoked.
*/
public static async authorizeQuery(input: {
recordsQueryMessage: RecordsQueryMessage,
public static async authorizeQueryOrSubscribe(input: {
incomingMessage: RecordsQueryMessage | RecordsSubscribeMessage,
expectedGrantedToInGrant: string,
expectedGrantedForInGrant: string,
permissionsGrantMessage: PermissionsGrantMessage,
messageStore: MessageStore,
}): Promise<void> {
const {
recordsQueryMessage, expectedGrantedToInGrant, expectedGrantedForInGrant, permissionsGrantMessage, messageStore
incomingMessage, expectedGrantedToInGrant, expectedGrantedForInGrant, permissionsGrantMessage, messageStore
} = input;

await GrantAuthorization.performBaseValidation({
incomingMessage: recordsQueryMessage,
incomingMessage,
expectedGrantedToInGrant,
expectedGrantedForInGrant,
permissionsGrantMessage,
messageStore
});

// If the grant specifies a protocol, the query must specify the same protocol.
const protocolInGrant = (permissionsGrantMessage as RecordsPermissionsGrantMessage).descriptor.scope.protocol;
const protocolInQuery = recordsQueryMessage.descriptor.filter.protocol;
if (protocolInGrant !== undefined && protocolInQuery !== protocolInGrant) {
// If the grant specifies a protocol, the subscribe or query must specify the same protocol.
const protocolInGrant = (permissionsGrantMessage.descriptor.scope as RecordsPermissionScope).protocol;
const protocolInMessage = incomingMessage.descriptor.filter.protocol;
if (protocolInGrant !== undefined && protocolInMessage !== protocolInGrant) {
throw new DwnError(
DwnErrorCode.RecordsGrantAuthorizationQueryProtocolScopeMismatch,
`Grant protocol scope ${protocolInGrant} does not match protocol in query ${protocolInQuery}`
DwnErrorCode.RecordsGrantAuthorizationQueryOrSubscribeProtocolScopeMismatch,
`Grant protocol scope ${protocolInGrant} does not match protocol in message ${protocolInMessage}`
);
}
}
Expand Down
22 changes: 20 additions & 2 deletions src/dwn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import type { EventLog } from './types/event-log.js';
import type { EventStream } from './types/subscriptions.js';
import type { MessageStore } from './types/message-store.js';
import type { MethodHandler } from './types/method-handler.js';
import type { Readable } from 'readable-stream';
import type { TenantGate } from './core/tenant-gate.js';
import type { UnionMessageReply } from './core/message-reply.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeMessageOptions, EventsSubscribeReply } from './types/events-types.js';
import type { GenericMessage, GenericMessageReply, MessageOptions } from './types/message-types.js';
import type { GenericMessage, GenericMessageReply, MessageSubscriptionHandler } from './types/message-types.js';
import type { MessagesGetMessage, MessagesGetReply } from './types/messages-types.js';
import type { PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage } from './types/permissions-types.js';
import type { ProtocolsConfigureMessage, ProtocolsQueryMessage, ProtocolsQueryReply } from './types/protocols-types.js';
import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsQueryReply, RecordsReadMessage, RecordsReadReply, RecordsWriteMessage, RecordsWriteMessageOptions } from './types/records-types.js';
import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsQueryReply, RecordsReadMessage, RecordsReadReply, RecordsSubscribeMessage, RecordsSubscribeMessageOptions, RecordsSubscribeReply, RecordSubscriptionHandler, RecordsWriteMessage, RecordsWriteMessageOptions } from './types/records-types.js';

import { AllowAllTenantGate } from './core/tenant-gate.js';
import { DidResolver } from './did/did-resolver.js';
Expand All @@ -28,6 +29,7 @@ import { ProtocolsQueryHandler } from './handlers/protocols-query.js';
import { RecordsDeleteHandler } from './handlers/records-delete.js';
import { RecordsQueryHandler } from './handlers/records-query.js';
import { RecordsReadHandler } from './handlers/records-read.js';
import { RecordsSubscribeHandler } from './handlers/records-subscribe.js';
import { RecordsWriteHandler } from './handlers/records-write.js';
import { DwnInterfaceName, DwnMethodName } from './enums/dwn-interface-method.js';

Expand All @@ -43,6 +45,7 @@ export class Dwn {
private constructor(config: DwnConfig) {
this.didResolver = config.didResolver!;
this.tenantGate = config.tenantGate!;
this.eventStream = config.eventStream!;
this.messageStore = config.messageStore;
this.dataStore = config.dataStore;
this.eventLog = config.eventLog;
Expand Down Expand Up @@ -112,6 +115,11 @@ export class Dwn {
this.messageStore,
this.dataStore
),
[DwnInterfaceName.Records + DwnMethodName.Subscribe]: new RecordsSubscribeHandler(
this.didResolver,
this.messageStore,
this.eventStream
),
[DwnInterfaceName.Records + DwnMethodName.Write]: new RecordsWriteHandler(
this.didResolver,
this.messageStore,
Expand Down Expand Up @@ -165,6 +173,8 @@ export class Dwn {
public async processMessage(tenant: string, rawMessage: PermissionsRevokeMessage): Promise<GenericMessageReply>;
public async processMessage(tenant: string, rawMessage: RecordsDeleteMessage): Promise<GenericMessageReply>;
public async processMessage(tenant: string, rawMessage: RecordsQueryMessage): Promise<RecordsQueryReply>;
public async processMessage(
tenant: string, rawMessage: RecordsSubscribeMessage, options: RecordsSubscribeMessageOptions): Promise<RecordsSubscribeReply>;
public async processMessage(tenant: string, rawMessage: RecordsReadMessage): Promise<RecordsReadReply>;
public async processMessage(tenant: string, rawMessage: RecordsWriteMessage, options?: RecordsWriteMessageOptions): Promise<GenericMessageReply>;
public async processMessage(tenant: string, rawMessage: unknown, options?: MessageOptions): Promise<UnionMessageReply>;
Expand Down Expand Up @@ -233,6 +243,14 @@ export class Dwn {
}
};

/**
* MessageOptions that are used when processing a message.
*/
export interface MessageOptions {
dataStream?: Readable;
subscriptionHandler?: MessageSubscriptionHandler | RecordSubscriptionHandler;
};

/**
* DWN configuration.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/events-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ export class EventsSubscribeHandler implements MethodHandler {
subscription,
};
}
}
}
Loading

0 comments on commit 8f6e01e

Please sign in to comment.