diff --git a/package.json b/package.json index eceba11..da77e2e 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ "test:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from unit unit", "test-coverage:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from coverage coverage" }, - "version": "1.8.2", + "version": "1.8.3", "main": "dist/index.js", "author": "Chris Carlson", "license": "MIT", diff --git a/src/message-db-client/stream-subscriber-position.ts b/src/message-db-client/stream-subscriber-position.ts index fdab7ea..34762f6 100644 --- a/src/message-db-client/stream-subscriber-position.ts +++ b/src/message-db-client/stream-subscriber-position.ts @@ -17,10 +17,10 @@ export async function loadStreamSubscriberPosition( if (lastMessage) { const position = lastMessage.data.streamPosition; - logger.log(`Successfully loaded position: ${position} for subscriber: ${subscriberId}`); + logger.debug(`Successfully loaded position: ${position} for subscriber: ${subscriberId}`); return position; } else { - logger.log(`Failed to load position for subscriber: ${subscriberId}, using default: ${defaultPosition}`); + logger.debug(`Failed to load position for subscriber: ${subscriberId}, using default: ${defaultPosition}`); return defaultPosition; } } diff --git a/src/message-store/message-store.ts b/src/message-store/message-store.ts index 05413bc..2187155 100644 --- a/src/message-store/message-store.ts +++ b/src/message-store/message-store.ts @@ -8,19 +8,28 @@ export class MessageStore implements IMessageStore { constructor(private client: IMessageDBClient, private logger: Logger = NoopLogger) {} + /** + * Tells the message store to subscribe to a particular stream category. + * When a message for that category is received, the message will be dispatched to the appropriate handler (if one exists). + * + * @param subscriberId Unique id for the subscriber. This is used for position tracking and should be unique to a message store. + * @param streamName The name of the stream category to subscribe to (e.g. 'shoppingCart:command'). + * @param handlers An object containing message handlers for each message type. + * @param options Options for the subscription. + */ async subscribeToCategory(subscriberId: string, streamName: string, handlers: MessageHandlers, options: SubscribeToCategoryOptions): Promise { const { pollingInterval = 1000, positionUpdateInterval = 100, retries = 1, ...remainingOptions } = options; let { startingPosition = 0 } = options; - + let position: number = await loadStreamSubscriberPosition(this.client, streamName, subscriberId, this.logger, startingPosition); let lastSavedPosition: number = position; let shouldUnsubscribe = false; - + let unsubscribe = () => { shouldUnsubscribe = true; }; - + const poll: () => Promise = async () => { const messages = await this.client.getCategoryMessages(streamName, { startingPosition: position, ...remainingOptions }); for (const message of messages) { @@ -39,10 +48,10 @@ export class MessageStore implements IMessageStore { lastSavedPosition = position; } } - + return true; }; - + const poller = promisePoller({ taskFn: poll, interval: pollingInterval, @@ -53,16 +62,16 @@ export class MessageStore implements IMessageStore { return resolvedValue ? true : false; }, }); - + // This is kinda weird check logic that needs to happen for promisePoller library on cancelled subscriptions poller.then().catch((e) => { if (e instanceof Array) { - this.logger.log("Subscription Closed"); + this.logger.debug("Subscription Closed"); } else { throw e; } }); - + return { unsubscribe }; } @@ -70,26 +79,26 @@ export class MessageStore implements IMessageStore { options = options ?? {}; let { startingPosition = 0 } = options; const { batchSize, condition } = options; - + let entity = initializeEntity(entityProjection); - + const latestStreamVersion = await this.client.getStreamVersion(streamName); - + if (latestStreamVersion.streamVersion === null) { return entity; } - + const parsedStreamVersion = latestStreamVersion.streamVersion; - + while (startingPosition <= parsedStreamVersion) { const messages = await this.client.getStreamMessages(streamName, { startingPosition, batchSize, condition }); for (const message of messages) { entity = doProjection(entity, message, entityProjection); } - + startingPosition += messages.length; } - + return entity; } diff --git a/src/types/entity-projection.type.ts b/src/types/entity-projection.type.ts index c53a030..475ac28 100644 --- a/src/types/entity-projection.type.ts +++ b/src/types/entity-projection.type.ts @@ -1,6 +1,12 @@ import { Message } from "./message.type"; import { Serializeable } from "./serializeable.type"; +/** +* Function called by the projection runner to handle a message for an entity. +* @param entity The entity to handle the message for. +* @param message The message to handle. +* @returns The updated entity. +*/ export type ProjectionHandlerFunc< EntityType, MessageType extends Message @@ -9,8 +15,21 @@ export type ProjectionHandlerFunc< export type EntityInitFn = () => T; export type Projection = { + /** Used in debugging to verify the projection running. */ projectionName: string; + /** The initial state of the entity. Can either be a function or a constant. */ entity: EntityType | EntityInitFn + /** + * Handlers for each available message type for the given entity. + * Used to bring the entity back to current state. + * @example + * handlers: { + * ItemAddedToCart: (entity: ShoppingCart, message: ItemAddedToCart) => { + * entity.addItem(message.data.item); + * return entity; + * }, + * } + */ handlers: Partial<{ [Property in MessageTypes['type'] as Extract]: ProjectionHandlerFunc>> }>; diff --git a/src/types/message-handler.type.ts b/src/types/message-handler.type.ts index c22cc4c..163f5bb 100644 --- a/src/types/message-handler.type.ts +++ b/src/types/message-handler.type.ts @@ -1,11 +1,52 @@ import { IMessageStore } from "./message-store.interface"; import { Message } from "./message.type"; +/** +* Context passed to a message handler. +* Used to access the message store or logger. +*/ export type MessageHandlerContext = { logger: any; messageStore: IMessageStore; }; +/** +* A function that handles a particular message. +* Called by the message dispatcher when a message is received during a subscription. +* +* @param message The message being handled. +* @param context The context of the message handler. +* @returns A promise that resolves when the message has been handled. +* +* @example +* const addItemHandler: MessageHandlerFunc = async (message, context) => { +* // Validate the message body. +* if (!message.data.item) { +* return; +* +* // Calculate current state +* const currentState = await messageStore.project('cart-123', shoppingCartProjection); +* +* // Handle the message +* try { +* const { item } = message.data; +* currentState.addItem(item); +* await context.messageStore.writeMessage('cart-123', { +* id: uuid(), +* type: 'ItemAddedToCart', +* data: { item }, +* metadata: message.metadata, +* }); +* } catch (error) { +* await context.messageStore.writeMessage('cart-123', { +* id: uuid(), +* type: 'AddItemFailed', +* data: { error }, +* metadata: message.metadata, +* }); +* } +* } +*/ export type MessageHandlerFunc< T extends Message = Message > = (message: T, context: MessageHandlerContext) => Promise; diff --git a/src/types/message.type.ts b/src/types/message.type.ts index da294db..ae8b2b4 100644 --- a/src/types/message.type.ts +++ b/src/types/message.type.ts @@ -1,20 +1,47 @@ -import {Serializeable} from "./serializeable.type"; +import { Serializeable } from "./serializeable.type"; export type Message = { + /** Unique identifier for the message. */ id: string; + + /** The name of the message (e.g. ItemAddedToCart or AddItemToCart). */ type: K; + + /** The stream position of the message */ position: number; + + /** The global position of the message */ globalPosition: number; + + /** The name of the stream that the message was written to. */ streamName: string; + + /** The data of the message. */ data: T; + metadata: { + /** The name of the stream that a reply message should be sent to. */ replyStreamName?: string; + + /** When using event workflows, this indicates the name of the stream that started to workflow. */ correlationStreamName?: string; + + /** Stream position of the message that caused this message to be written. */ causationMessagePosition?: string; + + /** The global position of the message that caused this message to be written. */ causationMessageGlobalPosition?: string; + + /** Name of the stream that contains the message that caused this message to be written. */ causationMessageStreamName?: string; + + /** Id that can be used to trace messages related to eachother. */ traceId?: string; + + /** The user that initiated the message. */ userId?: string; + + /** The version of the message. */ schemaVersion?: string; }; time: Date; diff --git a/src/types/type-predicate.type.ts b/src/types/type-predicate.type.ts index 5449cbd..385be2d 100644 --- a/src/types/type-predicate.type.ts +++ b/src/types/type-predicate.type.ts @@ -1,2 +1 @@ - export type TypePredicate = (data: unknown) => data is T;