Skip to content

Commit

Permalink
MessageStore Index tenant partitioning (#547)
Browse files Browse the repository at this point in the history
* failing test

* partition MessageStore indexing
  • Loading branch information
LiranCohen authored Oct 6, 2023
1 parent 11caeb4 commit d30914a
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 81 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Here's to a thrilling Hacktoberfest voyage with us! 🎉
# Decentralized Web Node (DWN) SDK <!-- omit in toc -->

Code Coverage
![Statements](https://img.shields.io/badge/statements-98.37%25-brightgreen.svg?style=flat) ![Branches](https://img.shields.io/badge/branches-95.25%25-brightgreen.svg?style=flat) ![Functions](https://img.shields.io/badge/functions-95.7%25-brightgreen.svg?style=flat) ![Lines](https://img.shields.io/badge/lines-98.37%25-brightgreen.svg?style=flat)
![Statements](https://img.shields.io/badge/statements-98.37%25-brightgreen.svg?style=flat) ![Branches](https://img.shields.io/badge/branches-95.11%25-brightgreen.svg?style=flat) ![Functions](https://img.shields.io/badge/functions-95.7%25-brightgreen.svg?style=flat) ![Lines](https://img.shields.io/badge/lines-98.37%25-brightgreen.svg?style=flat)

- [Introduction](#introduction)
- [Installation](#installation)
Expand Down
37 changes: 21 additions & 16 deletions src/store/index-level.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Filter, RangeFilter } from '../types/message-types.js';
import type { LevelWrapperBatchOperation, LevelWrapperIteratorOptions } from './level-wrapper.js';

import { executeUnlessAborted } from '../index.js';
import { flatten } from '../utils/object.js';
import { createLevelDatabase, LevelWrapper } from './level-wrapper.js';

Expand Down Expand Up @@ -38,11 +39,12 @@ export class IndexLevel {
* @param dataId ID of the data/object/content being indexed.
*/
async put(
tenant: string,
dataId: string,
indexes: { [property: string]: unknown },
options?: IndexLevelOptions
): Promise<void> {

const partition = await executeUnlessAborted(this.db.partition(tenant), options?.signal);
indexes = flatten(indexes);

const operations: LevelWrapperBatchOperation<string>[] = [ ];
Expand Down Expand Up @@ -70,13 +72,13 @@ export class IndexLevel {
// we can consider putting this info in a different data partition if this ever becomes more complex/confusing
operations.push({ type: 'put', key: `__${dataId}__indexes`, value: JSON.stringify(indexes) });

await this.db.batch(operations, options);
await partition.batch(operations, options);
}

/**
* Executes the given single filter query and appends the results without duplicate into `matchedIDs`.
*/
private async executeSingleFilterQuery(filter: Filter, matchedIDs: Set<string>, options?: IndexLevelOptions): Promise<void> {
private async executeSingleFilterQuery(tenant: string, filter: Filter, matchedIDs: Set<string>, options?: IndexLevelOptions): Promise<void> {
// Note: We have an array of Promises in order to support OR (anyOf) matches when given a list of accepted values for a property
const propertyNameToPromises: { [key: string]: Promise<string[]>[] } = {};

Expand All @@ -93,17 +95,17 @@ export class IndexLevel {
// then adding them to the promises associated with `propertyName`
propertyNameToPromises[propertyName] = [];
for (const propertyValue of new Set(propertyFilter)) {
const exactMatchesPromise = this.findExactMatches(propertyName, propertyValue, options);
const exactMatchesPromise = this.findExactMatches(tenant, propertyName, propertyValue, options);
propertyNameToPromises[propertyName].push(exactMatchesPromise);
}
} else {
// `propertyFilter` is a `RangeFilter`
const rangeMatchesPromise = this.findRangeMatches(propertyName, propertyFilter, options);
const rangeMatchesPromise = this.findRangeMatches(tenant, propertyName, propertyFilter, options);
propertyNameToPromises[propertyName] = [rangeMatchesPromise];
}
} else {
// propertyFilter is an EqualFilter, meaning it is a non-object primitive type
const exactMatchesPromise = this.findExactMatches(propertyName, propertyFilter, options);
const exactMatchesPromise = this.findExactMatches(tenant, propertyName, propertyFilter, options);
propertyNameToPromises[propertyName] = [exactMatchesPromise];
}
}
Expand Down Expand Up @@ -138,18 +140,19 @@ export class IndexLevel {
}
}

async query(filters: Filter[], options?: IndexLevelOptions): Promise<Array<string>> {
async query(tenant: string, filters: Filter[], options?: IndexLevelOptions): Promise<Array<string>> {
const matchedIDs: Set<string> = new Set();

for (const filter of filters) {
await this.executeSingleFilterQuery(filter, matchedIDs, options);
await this.executeSingleFilterQuery(tenant, filter, matchedIDs, options);
}

return [...matchedIDs];
}

async delete(dataId: string, options?: IndexLevelOptions): Promise<void> {
const serializedIndexes = await this.db.get(`__${dataId}__indexes`, options);
async delete(tenant: string, dataId: string, options?: IndexLevelOptions): Promise<void> {
const partition = await executeUnlessAborted(this.db.partition(tenant), options?.signal);
const serializedIndexes = await partition.get(`__${dataId}__indexes`, options);
if (!serializedIndexes) {
return;
}
Expand All @@ -166,7 +169,7 @@ export class IndexLevel {

ops.push({ type: 'del', key: `__${dataId}__indexes` });

await this.db.batch(ops, options);
await partition.batch(ops, options);
}

async clear(): Promise<void> {
Expand All @@ -176,15 +179,16 @@ export class IndexLevel {
/**
* @returns IDs of data that matches the exact property and value.
*/
private async findExactMatches(propertyName: string, propertyValue: unknown, options?: IndexLevelOptions): Promise<string[]> {
private async findExactMatches(tenant: string, propertyName: string, propertyValue: unknown, options?: IndexLevelOptions): Promise<string[]> {
const partition = await executeUnlessAborted(this.db.partition(tenant), options?.signal);
const propertyValuePrefix = this.join(propertyName, this.encodeValue(propertyValue), '');

const iteratorOptions: LevelWrapperIteratorOptions<string> = {
gt: propertyValuePrefix
};

const matches: string[] = [];
for await (const [ key, dataId ] of this.db.iterator(iteratorOptions, options)) {
for await (const [ key, dataId ] of partition.iterator(iteratorOptions, options)) {
if (!key.startsWith(propertyValuePrefix)) {
break;
}
Expand All @@ -197,7 +201,8 @@ export class IndexLevel {
/**
* @returns IDs of data that matches the range filter.
*/
private async findRangeMatches(propertyName: string, rangeFilter: RangeFilter, options?: IndexLevelOptions): Promise<string[]> {
private async findRangeMatches(tenant: string, propertyName: string, rangeFilter: RangeFilter, options?: IndexLevelOptions): Promise<string[]> {
const partition = await executeUnlessAborted(this.db.partition(tenant), options?.signal);
const iteratorOptions: LevelWrapperIteratorOptions<string> = {};

for (const comparator in rangeFilter) {
Expand All @@ -212,7 +217,7 @@ export class IndexLevel {
}

const matches: string[] = [];
for await (const [ key, dataId ] of this.db.iterator(iteratorOptions, options)) {
for await (const [ key, dataId ] of partition.iterator(iteratorOptions, options)) {
// if "greater-than" is specified, skip all keys that contains the exact value given in the "greater-than" condition
if ('gt' in rangeFilter && IndexLevel.extractValueFromKey(key) === this.encodeValue(rangeFilter.gt)) {
continue;
Expand All @@ -232,7 +237,7 @@ export class IndexLevel {
// key = 'dateCreated\u0000"2023-05-25T11:22:33.000000Z"\u0000bafyreigs3em7lrclhntzhgvkrf75j2muk6e7ypq3lrw3ffgcpyazyw6pry'
// the value would be considered greater than { lte: `dateCreated\u0000"2023-05-25T11:22:33.000000Z"` } used in the iterator options,
// thus would not be included in the iterator even though we'd like it to be.
for (const dataId of await this.findExactMatches(propertyName, rangeFilter.lte, options)) {
for (const dataId of await this.findExactMatches(tenant, propertyName, rangeFilter.lte, options)) {
matches.push(dataId);
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/store/message-store-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ export class MessageStoreLevel implements MessageStore {
options?.signal?.throwIfAborted();

const messages: GenericMessage[] = [];
const resultIds = await this.index.query(filters.map(f => ({ ...f, tenant })), options);
// note: injecting tenant into filters to allow querying with an "empty" filter.
// if there are no other filters present it will return all the messages the tenant.
const resultIds = await this.index.query(tenant, filters.map(f => ({ ...f, tenant })), options);

// as an optimization for large data sets, we are finding the message object which matches the paginationMessageCid here.
// we can use this within the pagination function after sorting to determine the starting point of the array in a more efficient way.
Expand Down Expand Up @@ -235,7 +237,7 @@ export class MessageStoreLevel implements MessageStore {

const cid = CID.parse(cidString);
await partition.delete(cid, options);
await this.index.delete(cidString, options);
await this.index.delete(tenant, cidString, options);
}

async put(
Expand All @@ -255,11 +257,15 @@ export class MessageStoreLevel implements MessageStore {
await partition.put(messageCid, encodedMessageBlock.bytes, options);

const messageCidString = messageCid.toString();

// note: leaving the additional tenant indexing to allow for querying with an "empty" filter.
// when querying, we also inject a filter for the specific tenant.
// if there are no other filters present it will return all the messages for that tenant.
const indexDocument = {
...indexes,
tenant,
};
await this.index.put(messageCidString, indexDocument, options);
await this.index.put(tenant, messageCidString, indexDocument, options);
}

/**
Expand Down
2 changes: 0 additions & 2 deletions tests/event-log/event-log-level.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ describe('EventLogLevel Tests', () => {
expect(events.length).to.equal(1);
expect(events[0].watermark).to.equal(watermark2);
expect(events[0].messageCid).to.equal(messageCid2);


});

it('returns events in the order that they were appended', async () => {
Expand Down
Loading

0 comments on commit d30914a

Please sign in to comment.