Skip to content

Commit

Permalink
feat(#9238): add functionality of getting people as an AsyncGenerator…
Browse files Browse the repository at this point in the history
… in cht-datasource (#9281)
  • Loading branch information
sugat009 authored Aug 12, 2024
1 parent 8396fb8 commit bf8a77d
Show file tree
Hide file tree
Showing 12 changed files with 292 additions and 73 deletions.
3 changes: 2 additions & 1 deletion shared-libs/cht-datasource/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ module.exports = {
MethodDefinition: true,
},
publicOnly: true,
}]
}],
['jsdoc/check-tag-names']: ['error', { definedTags: ['typeParam'] }],
}
}
]
Expand Down
17 changes: 13 additions & 4 deletions shared-libs/cht-datasource/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,25 @@ export const getDatasource = (ctx: DataContext) => {
/**
* Returns an array of people for the provided page specifications.
* @param personType the type of people to return
* @param cursor a value representing the index of the first person to return
* @param limit the maximum number of people to return. Default is 100.
* @param skip the number of people to skip. Default is 0.
* @returns an array of people for the provided page specifications
* @throws Error if no type is provided or if the type is not for a person
* @throws Error if the provided limit is `<= 0`
* @throws Error if the provided skip is `< 0`
* @throws Error if the provided cursor is `< 0`
* @see {@link getByType} which provides the same data, but without having to manually account for paging
*/
getPageByType: (personType: string, limit = 100, skip = 0) => ctx.bind(Person.v1.getPage)(
Qualifier.byContactType(personType), limit, skip
getPageByType: (personType: string, cursor = '0', limit = 100) => ctx.bind(Person.v1.getPage)(
Qualifier.byContactType(personType), cursor, limit
),

/**
* Returns a generator for fetching all people with the given type.
* @param personType the type of people to return
* @returns a generator for fetching all people with the given type
* @throws Error if no type is provided or if the type is not for a person
*/
getByType: (personType: string) => ctx.bind(Person.v1.getAll)(Qualifier.byContactType(personType)),
}
}
};
Expand Down
7 changes: 6 additions & 1 deletion shared-libs/cht-datasource/src/libs/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ export abstract class AbstractDataContext implements DataContext {
readonly bind = <T>(fn: (ctx: DataContext) => T): T => fn(this);
}

/** @internal */
/**
* Represents a page of results. The `data` array contains the results for this page. The `cursor` field contains a
* key that can be used to fetch the next page of results. If no `cursor` value is returned, there are no additional
* results available. (Note that no assumptions should be made about the _contents_ of the cursor string.)
* @typeParam T the type of the data in the page
*/
export interface Page<T> {
readonly data: T[];
readonly cursor: string;
Expand Down
22 changes: 21 additions & 1 deletion shared-libs/cht-datasource/src/libs/data-context.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { hasField, isRecord } from './core';
import { hasField, isRecord, Page } from './core';
import { isLocalDataContext, LocalDataContext } from '../local/libs/data-context';
import { assertRemoteDataContext, isRemoteDataContext, RemoteDataContext } from '../remote/libs/data-context';

Expand Down Expand Up @@ -39,3 +39,23 @@ export const adapt = <T>(
assertRemoteDataContext(context);
return remote(context);
};

/** @internal */
export const getDocumentStream = async function* <S, T>(
fetchFunction: (args: S, s: string, l: number) => Promise<Page<T>>,
fetchFunctionArgs: S
): AsyncGenerator<T, void> {
const limit = 100;
let cursor = '0';
const hasMoreResults = () => cursor !== '-1';

do {
const docs = await fetchFunction(fetchFunctionArgs, cursor, limit);

for (const doc of docs.data) {
yield doc;
}

cursor = docs.cursor;
} while (hasMoreResults());
};
46 changes: 24 additions & 22 deletions shared-libs/cht-datasource/src/local/person.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ export namespace v1 {
export const getPage = ({ medicDb, settings }: LocalDataContext) => {
return async (
personType: ContactTypeQualifier,
cursor: string,
limit: number,
skip: number
): Promise<Page<Person.v1.Person>> => {
const personTypes = contactTypeUtils.getPersonTypes(settings.getAll());
const personTypesIds = personTypes.map((item) => item.id);
Expand All @@ -73,43 +73,45 @@ export namespace v1 {
throw new Error(`Invalid person type: ${personType.contactType}`);
}

// Adding a number skip variable here so as not to confuse ourselves
const skip = Number(cursor) || 0;
const getDocsByPage = queryDocsByKey(medicDb, 'medic-client/contacts_by_type');

const fetchAndFilter = async (
currentLimit: number,
currentSkip: number,
personDocs: Person.v1.Person[],
totalDocsFetched = 0,
currentPersonDocs: Person.v1.Person[] = [],
): Promise<Page<Person.v1.Person>> => {
const docs = await getDocsByPage([personType.contactType], currentLimit, currentSkip);
if (docs.length === 0) {
return { data: personDocs, cursor: '-1' };
}
const noMoreResults = docs.length < currentLimit;
const newPersonDocs = docs.filter((doc): doc is Person.v1.Person => isPerson(settings, doc, doc?._id));
const overFetchCount = currentPersonDocs.length + newPersonDocs.length - limit || 0;
const totalPeople = [...currentPersonDocs, ...newPersonDocs].slice(0, limit);

const tempFilteredDocs = docs.filter((doc): doc is Person.v1.Person => isPerson(settings, doc, doc?._id));
if (noMoreResults) {
return { data: totalPeople, cursor: '-1' };
}

personDocs.push(...tempFilteredDocs);
totalDocsFetched += docs.length;
if (totalPeople.length === limit) {
const nextSkip = currentSkip + currentLimit - overFetchCount;

if (personDocs.length >= limit) {
let cursor: number;
if (docs.length < currentLimit) {
cursor = -1;
} else {
cursor = skip + totalDocsFetched - (personDocs.length - limit);
}
return { data: personDocs.slice(0, limit), cursor: cursor.toString() };
return { data: totalPeople, cursor: nextSkip.toString() };
}

// Re-fetch twice as many docs as we need to limit number of recursions
const missingCount = currentLimit - newPersonDocs.length;
logger.debug(`Found [${missingCount.toString()}] invalid persons. Re-fetching additional records.`);
const nextLimit = missingCount * 2;
const nextSkip = currentSkip + currentLimit;

return fetchAndFilter(
(currentLimit - tempFilteredDocs.length) * 2,
currentSkip + currentLimit,
personDocs,
totalDocsFetched
nextLimit,
nextSkip,
totalPeople,
);
};

return fetchAndFilter(limit, skip, []);
return fetchAndFilter(limit, skip);
};
};
}
50 changes: 36 additions & 14 deletions shared-libs/cht-datasource/src/person.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { isContactTypeQualifier, isUuidQualifier, ContactTypeQualifier, UuidQualifier } from './qualifier';
import { adapt, assertDataContext, DataContext } from './libs/data-context';
import { ContactTypeQualifier, isContactTypeQualifier, isUuidQualifier, UuidQualifier } from './qualifier';
import { adapt, assertDataContext, DataContext, getDocumentStream } from './libs/data-context';
import { Contact, NormalizedParent } from './libs/contact';
import * as Remote from './remote';
import * as Local from './local';
Expand Down Expand Up @@ -47,9 +47,9 @@ export namespace v1 {
}
};

const assertSkip = (skip: unknown) => {
if (typeof skip !== 'number' || !Number.isInteger(skip) || skip < 0) {
throw new Error(`The skip must be a non-negative number: [${String(skip)}]`);
const assertCursor = (cursor: unknown) => {
if (typeof cursor !== 'string' || Number(cursor) < 0) {
throw new Error(`The cursor must be a stringified non-negative number: [${String(cursor)}]`);
}
};

Expand Down Expand Up @@ -86,38 +86,60 @@ export namespace v1 {
* @param context the current data context
* @returns a function for retrieving a paged array of people
* @throws Error if a data context is not provided
* @see {@link getAll} which provides the same data, but without having to manually account for paging
*/
export const getPage = (
context: DataContext
): (
personType: ContactTypeQualifier,
limit: number,
skip: number
) => Promise<Page<Person>> => {
): typeof curriedFn => {
assertDataContext(context);
const fn = adapt(context, Local.Person.v1.getPage, Remote.Person.v1.getPage);

/**
* Returns an array of people for the provided page specifications.
* @param personType the type of people to return
* @param cursor the number of people to skip. Default is 0.
* @param limit the maximum number of people to return. Default is 100.
* @param skip the number of people to skip. Default is 0.
* @returns an array of people for the provided page specifications.
* @throws Error if no type is provided or if the type is not for a person
* @throws Error if the provided `limit` value is `<=0`
* @throws Error if the provided `skip` value is `<0`
*/
const curriedFn = async (
personType: ContactTypeQualifier,
cursor = '0',
limit = 100,
skip = 0
): Promise<Page<Person>> => {
assertTypeQualifier(personType);
assertLimit(limit);
assertSkip(skip);
assertCursor(cursor);

return fn(personType, limit, skip);
return fn(personType, cursor, limit);
};
return curriedFn;
};

/**
* Returns a function for getting a generator that fetches people from the given data context.
* @param context the current data context
* @returns a function for getting a generator that fetches people
* @throws Error if a data context is not provided
*/
export const getAll = (
context: DataContext
): typeof curriedGen => {
assertDataContext(context);

/**
* Returns a generator for fetching all people with the given type
* @param personType the type of people to return
* @returns a generator for fetching all people with the given type
* @throws Error if no type is provided or if the type is not for a person
*/
const curriedGen = (personType: ContactTypeQualifier): AsyncGenerator<Person, void> => {
assertTypeQualifier(personType);
const getPage = context.bind(v1.getPage);
return getDocumentStream(getPage, personType);
};
return curriedGen;
};
}
4 changes: 2 additions & 2 deletions shared-libs/cht-datasource/src/remote/person.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ export namespace v1 {
/** @internal */
export const getPage = (remoteContext: RemoteDataContext) => (
personType: ContactTypeQualifier,
cursor: string,
limit: number,
skip: number
): Promise<Page<Person.v1.Person>> => getPeople(remoteContext)(
{'limit': limit.toString(), 'skip': skip.toString(), 'contactType': personType.contactType}
{'limit': limit.toString(), 'contactType': personType.contactType, cursor}
);
}
30 changes: 25 additions & 5 deletions shared-libs/cht-datasource/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ describe('CHT Script API - getDatasource', () => {
beforeEach(() => person = v1.person);

it('contains expected keys', () => {
expect(person).to.have.all.keys(['getByUuid', 'getByUuidWithLineage', 'getPageByType']);
expect(person).to.have.all.keys(['getByType', 'getByUuid', 'getByUuidWithLineage', 'getPageByType']);
});

it('getByUuid', async () => {
Expand Down Expand Up @@ -126,21 +126,41 @@ describe('CHT Script API - getDatasource', () => {
expect(byUuid.calledOnceWithExactly(qualifier.uuid)).to.be.true;
});

it('getPage', async () => {
it('getPageByType', async () => {
const expectedPeople: Page<Person.v1.Person> = {data: [], cursor: '-1'};
const personGetPage = sinon.stub().resolves(expectedPeople);
dataContextBind.returns(personGetPage);
const personType = 'person';
const limit = 2;
const skip = 1;
const cursor = '1';
const personTypeQualifier = { contactType: personType };
const byContactType = sinon.stub(Qualifier, 'byContactType').returns(personTypeQualifier);

const returnedPeople = await person.getPageByType(personType, limit, skip);
const returnedPeople = await person.getPageByType(personType, cursor, limit);

expect(returnedPeople).to.equal(expectedPeople);
expect(dataContextBind.calledOnceWithExactly(Person.v1.getPage)).to.be.true;
expect(personGetPage.calledOnceWithExactly(personTypeQualifier, limit, skip)).to.be.true;
expect(personGetPage.calledOnceWithExactly(personTypeQualifier, cursor, limit)).to.be.true;
expect(byContactType.calledOnceWithExactly(personType)).to.be.true;
});

it('getByType', async () => {
// eslint-disable-next-line @typescript-eslint/require-await
const mockAsyncGenerator = async function* () {
yield [];
};
const personGetAll = sinon.stub().resolves(mockAsyncGenerator);
dataContextBind.returns(personGetAll);
const personType = 'person';
const personTypeQualifier = { contactType: personType };
const byContactType = sinon.stub(Qualifier, 'byContactType').returns(personTypeQualifier);

// eslint-disable-next-line @typescript-eslint/await-thenable
const res = await person.getByType(personType);

expect(res).to.deep.equal(mockAsyncGenerator);
expect(dataContextBind.calledOnceWithExactly(Person.v1.getAll)).to.be.true;
expect(personGetAll.calledOnceWithExactly(personTypeQualifier)).to.be.true;
expect(byContactType.calledOnceWithExactly(personType)).to.be.true;
});
});
Expand Down
66 changes: 65 additions & 1 deletion shared-libs/cht-datasource/test/libs/data-context.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { adapt, assertDataContext } from '../../src/libs/data-context';
import { adapt, assertDataContext, getDocumentStream } from '../../src/libs/data-context';
import * as LocalContext from '../../src/local/libs/data-context';
import * as RemoteContext from '../../src/remote/libs/data-context';
import sinon, { SinonStub } from 'sinon';
Expand Down Expand Up @@ -118,4 +118,68 @@ describe('context lib', () => {
expect(remote.notCalled).to.be.true;
});
});

describe('getDocumentStream', () => {
let fetchFunctionStub: SinonStub;
const limit = 100;
const cursor = '0';

beforeEach(() => {
fetchFunctionStub = sinon.stub();
});

it('yields document one by one', async () => {
const mockDocs = [{ id: 1 }, { id: 2 }, { id: 3 }];
const mockPage = { data: mockDocs, cursor: '-1' };
const extraArg = 'value';
fetchFunctionStub.resolves(mockPage);

const generator = getDocumentStream(fetchFunctionStub, extraArg);

const results = [];

for await (const doc of generator) {
results.push(doc);
}

expect(results).to.deep.equal(mockDocs);
expect(fetchFunctionStub.calledOnceWithExactly(extraArg, cursor, limit)).to.be.true;
});

it('should handle multiple pages', async () => {
const mockDoc = { id: 1 };
const mockDocs1 = Array.from({ length: 100 }, () => ({ ...mockDoc }));
const mockPage1 = { data: mockDocs1, cursor: '100' };
const mockDocs2 = [{ id: 101 }];
const mockPage2 = { data: mockDocs2, cursor: '-1' };
const extraArg = 'value';

fetchFunctionStub.onFirstCall().resolves(mockPage1);
fetchFunctionStub.onSecondCall().resolves(mockPage2);

const generator = getDocumentStream(fetchFunctionStub, extraArg);

const results = [];
for await (const doc of generator) {
results.push(doc);
}

expect(results).to.deep.equal([...mockDocs1, ...mockDocs2]);
expect(fetchFunctionStub.callCount).to.equal(2);
expect(fetchFunctionStub.firstCall.args).to.deep.equal([extraArg, cursor, limit]);
expect(fetchFunctionStub.secondCall.args).to.deep.equal([extraArg, (Number(cursor) + limit).toString(), limit]);
});

it('should handle empty result', async () => {
fetchFunctionStub.resolves({ data: [], cursor: '-1' });

const generator = getDocumentStream(fetchFunctionStub, { limit: 10, skip: 0 });

const result = await generator.next();

expect(result.done).to.be.true;
expect(result.value).to.be.equal(undefined);
expect(fetchFunctionStub.calledOnce).to.be.true;
});
});
});
Loading

0 comments on commit bf8a77d

Please sign in to comment.