Skip to content

Commit

Permalink
Added docs and switched to debug logging for compat reasons
Browse files Browse the repository at this point in the history
  • Loading branch information
doughtnerd committed Sep 7, 2023
1 parent fca3b7e commit 8d5741e
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 20 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"test:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from unit unit",
"test-coverage:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from coverage coverage"
},
"version": "1.8.2",
"version": "1.8.3",
"main": "dist/index.js",
"author": "Chris Carlson",
"license": "MIT",
Expand Down
4 changes: 2 additions & 2 deletions src/message-db-client/stream-subscriber-position.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ export async function loadStreamSubscriberPosition(

if (lastMessage) {
const position = lastMessage.data.streamPosition;
logger.log(`Successfully loaded position: ${position} for subscriber: ${subscriberId}`);
logger.debug(`Successfully loaded position: ${position} for subscriber: ${subscriberId}`);
return position;
} else {
logger.log(`Failed to load position for subscriber: ${subscriberId}, using default: ${defaultPosition}`);
logger.debug(`Failed to load position for subscriber: ${subscriberId}, using default: ${defaultPosition}`);
return defaultPosition;
}
}
Expand Down
39 changes: 24 additions & 15 deletions src/message-store/message-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,28 @@ export class MessageStore implements IMessageStore {

constructor(private client: IMessageDBClient, private logger: Logger = NoopLogger) {}

/**
* Tells the message store to subscribe to a particular stream category.
* When a message for that category is received, the message will be dispatched to the appropriate handler (if one exists).
*
* @param subscriberId Unique id for the subscriber. This is used for position tracking and should be unique to a message store.
* @param streamName The name of the stream category to subscribe to (e.g. 'shoppingCart:command').
* @param handlers An object containing message handlers for each message type.
* @param options Options for the subscription.
*/
async subscribeToCategory(subscriberId: string, streamName: string, handlers: MessageHandlers, options: SubscribeToCategoryOptions): Promise<Subscription> {
const { pollingInterval = 1000, positionUpdateInterval = 100, retries = 1, ...remainingOptions } = options;
let { startingPosition = 0 } = options;

let position: number = await loadStreamSubscriberPosition(this.client, streamName, subscriberId, this.logger, startingPosition);
let lastSavedPosition: number = position;

let shouldUnsubscribe = false;

let unsubscribe = () => {
shouldUnsubscribe = true;
};

const poll: () => Promise<boolean> = async () => {
const messages = await this.client.getCategoryMessages(streamName, { startingPosition: position, ...remainingOptions });
for (const message of messages) {
Expand All @@ -39,10 +48,10 @@ export class MessageStore implements IMessageStore {
lastSavedPosition = position;
}
}

return true;
};

const poller = promisePoller({
taskFn: poll,
interval: pollingInterval,
Expand All @@ -53,43 +62,43 @@ export class MessageStore implements IMessageStore {
return resolvedValue ? true : false;
},
});

// This is kinda weird check logic that needs to happen for promisePoller library on cancelled subscriptions
poller.then().catch((e) => {
if (e instanceof Array) {
this.logger.log("Subscription Closed");
this.logger.debug("Subscription Closed");
} else {
throw e;
}
});

return { unsubscribe };
}

async project<EntityType, MessageTypes extends Message>(streamName: string, entityProjection: Projection<EntityType, MessageTypes>, options?: ProjectOptions): Promise<EntityType> {
options = options ?? {};
let { startingPosition = 0 } = options;
const { batchSize, condition } = options;

let entity = initializeEntity<EntityType, MessageTypes>(entityProjection);

const latestStreamVersion = await this.client.getStreamVersion(streamName);

if (latestStreamVersion.streamVersion === null) {
return entity;
}

const parsedStreamVersion = latestStreamVersion.streamVersion;

while (startingPosition <= parsedStreamVersion) {
const messages = await this.client.getStreamMessages(streamName, { startingPosition, batchSize, condition });
for (const message of messages) {
entity = doProjection<EntityType, MessageTypes>(entity, message, entityProjection);
}

startingPosition += messages.length;
}

return entity;
}

Expand Down
19 changes: 19 additions & 0 deletions src/types/entity-projection.type.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { Message } from "./message.type";
import { Serializeable } from "./serializeable.type";

/**
* Function called by the projection runner to handle a message for an entity.
* @param entity The entity to handle the message for.
* @param message The message to handle.
* @returns The updated entity.
*/
export type ProjectionHandlerFunc<
EntityType,
MessageType extends Message
Expand All @@ -9,8 +15,21 @@ export type ProjectionHandlerFunc<
export type EntityInitFn<T> = () => T;

export type Projection<EntityType = any, MessageTypes extends Message = Message> = {
/** Used in debugging to verify the projection running. */
projectionName: string;
/** The initial state of the entity. Can either be a function or a constant. */
entity: EntityType | EntityInitFn<EntityType>
/**
* Handlers for each available message type for the given entity.
* Used to bring the entity back to current state.
* @example
* handlers: {
* ItemAddedToCart: (entity: ShoppingCart, message: ItemAddedToCart) => {
* entity.addItem(message.data.item);
* return entity;
* },
* }
*/
handlers: Partial<{
[Property in MessageTypes['type'] as Extract<Property, string>]: ProjectionHandlerFunc<EntityType, Extract<MessageTypes, Message<Serializeable, Property>>>
}>;
Expand Down
41 changes: 41 additions & 0 deletions src/types/message-handler.type.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,52 @@
import { IMessageStore } from "./message-store.interface";
import { Message } from "./message.type";

/**
* Context passed to a message handler.
* Used to access the message store or logger.
*/
export type MessageHandlerContext = {
logger: any;
messageStore: IMessageStore;
};

/**
* A function that handles a particular message.
* Called by the message dispatcher when a message is received during a subscription.
*
* @param message The message being handled.
* @param context The context of the message handler.
* @returns A promise that resolves when the message has been handled.
*
* @example
* const addItemHandler: MessageHandlerFunc = async (message, context) => {
* // Validate the message body.
* if (!message.data.item) {
* return;
*
* // Calculate current state
* const currentState = await messageStore.project<ShoppingCart>('cart-123', shoppingCartProjection);
*
* // Handle the message
* try {
* const { item } = message.data;
* currentState.addItem(item);
* await context.messageStore.writeMessage('cart-123', {
* id: uuid(),
* type: 'ItemAddedToCart',
* data: { item },
* metadata: message.metadata,
* });
* } catch (error) {
* await context.messageStore.writeMessage('cart-123', {
* id: uuid(),
* type: 'AddItemFailed',
* data: { error },
* metadata: message.metadata,
* });
* }
* }
*/
export type MessageHandlerFunc<
T extends Message = Message
> = (message: T, context: MessageHandlerContext) => Promise<void>;
29 changes: 28 additions & 1 deletion src/types/message.type.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,47 @@
import {Serializeable} from "./serializeable.type";
import { Serializeable } from "./serializeable.type";

export type Message<T extends Serializeable = {}, K extends string = string> = {
/** Unique identifier for the message. */
id: string;

/** The name of the message (e.g. ItemAddedToCart or AddItemToCart). */
type: K;

/** The stream position of the message */
position: number;

/** The global position of the message */
globalPosition: number;

/** The name of the stream that the message was written to. */
streamName: string;

/** The data of the message. */
data: T;

metadata: {
/** The name of the stream that a reply message should be sent to. */
replyStreamName?: string;

/** When using event workflows, this indicates the name of the stream that started to workflow. */
correlationStreamName?: string;

/** Stream position of the message that caused this message to be written. */
causationMessagePosition?: string;

/** The global position of the message that caused this message to be written. */
causationMessageGlobalPosition?: string;

/** Name of the stream that contains the message that caused this message to be written. */
causationMessageStreamName?: string;

/** Id that can be used to trace messages related to eachother. */
traceId?: string;

/** The user that initiated the message. */
userId?: string;

/** The version of the message. */
schemaVersion?: string;
};
time: Date;
Expand Down
1 change: 0 additions & 1 deletion src/types/type-predicate.type.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@

export type TypePredicate<T> = (data: unknown) => data is T;

0 comments on commit 8d5741e

Please sign in to comment.