Skip to content

Commit

Permalink
update types; add tests for concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
zemberdotnet committed Nov 30, 2023
1 parent 0d93246 commit 26a8889
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 4 deletions.
1 change: 1 addition & 0 deletions packages/integration-sdk-core/src/types/jobState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export interface JobState {
iterateEntities: <T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
options?: { concurrency: number },
) => Promise<void>;

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/integration-sdk-runtime/src/execution/jobState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ export function createStepJobState({
return duplicateKeyTracker.hasKey(_key);
},

iterateEntities: (filter, iteratee) =>
graphObjectStore.iterateEntities(filter, iteratee),
iterateEntities: (filter, iteratee, options) =>
graphObjectStore.iterateEntities(filter, iteratee, options),

iterateRelationships: (filter, iteratee) =>
graphObjectStore.iterateRelationships(filter, iteratee),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,14 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
async iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
options: { concurrency: number },
) {
await this.localGraphObjectStore.iterateEntities(filter, iteratee);
await this.localGraphObjectStore.iterateEntities(filter, iteratee, options);

await iterateEntityTypeIndex({
type: filter._type,
iteratee,
options,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { RelationshipClass } from '@jupiterone/data-model';
import { FlushedGraphObjectData } from '../../types';
import sortBy from 'lodash/sortBy';
import { getSizeOfObject } from '../../../synchronization/batchBySize';
import { warn } from 'console';

jest.mock('fs');

Expand Down Expand Up @@ -576,6 +577,42 @@ describe('iterateEntities', () => {
]),
);
});

test('should allow concurrency when iterating entities', async () => {
const { storageDirectoryPath, store } = setupFileSystemObjectStore();

const entityType = uuid();

type TestEntity = Entity & { randomField: string };

const entities = times(5, () =>
createTestEntity({ _type: entityType, randomField: 'field' }),
);

await store.addEntities(storageDirectoryPath, entities);

const taskBeginTimes = {};
const collectedEntities: TestEntity[] = [];
const task = async (e: TestEntity) => {
taskBeginTimes[e._key] = Date.now();
await new Promise((resolve) => setTimeout(resolve, 100));
collectedEntities.push(e);
};

await store.iterateEntities<TestEntity>({ _type: entityType }, task, {
concurrency: 5,
});

for (const e1 of entities) {
for (const e2 of entities) {
expect(
Math.abs(taskBeginTimes[e1._key] - taskBeginTimes[e2._key]),
).toBeLessThan(100);
}
}

expect(collectedEntities.length).toBe(5);
});
});

describe('iterateRelationships', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import { onQueueSizeIsLessThanLimit } from '../queue';
interface BaseIterateCollectionIndexParams<GraphObject> {
type: string;
iteratee: GraphObjectIteratee<GraphObject>;
options?: { concurrency: number };
}

interface IterateCollectionIndexParams<GraphObject>
extends BaseIterateCollectionIndexParams<GraphObject> {
collectionType: 'entities' | 'relationships';
options?: { concurrency: number };
}

async function iterateCollectionTypeIndex<T extends Entity | Relationship>({
Expand Down Expand Up @@ -52,11 +52,13 @@ async function iterateCollectionTypeIndex<T extends Entity | Relationship>({
export async function iterateEntityTypeIndex<T extends Entity = Entity>({
type,
iteratee,
options,
}: BaseIterateCollectionIndexParams<T>) {
await iterateCollectionTypeIndex({
type,
iteratee,
collectionType: 'entities',
options,
});
}

Expand Down
56 changes: 56 additions & 0 deletions packages/integration-sdk-runtime/src/storage/memory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
createTestEntity,
createTestRelationship,
} from '@jupiterone/integration-sdk-private-test-utils';
import { times } from 'lodash';

async function collectEntitiesByType(
store: InMemoryGraphObjectStore,
Expand Down Expand Up @@ -81,6 +82,61 @@ describe('#InMemoryGraphObjectStore', () => {
expect(await collectEntitiesByType(store, e2._type)).toEqual([e2]);
});

test('tasks should run concurrently', async () => {
const concurrency = 5;

const taskStartTimes = {};

// This tests the concurrency by making each task take _at least_ 250 ms.
// Then it compares the start time of each task to all the other tasks.
// Since concurrency is equal to the number of entities we are iterating
// all the tasks should start immediately and the difference in start time should be
// less than 250ms
const task = async (entity: Entity) => {
taskStartTimes[entity._key] = Date.now();
await new Promise((resolve) => setTimeout(resolve, 250)); // Mock task delay
};
const store = new InMemoryGraphObjectStore();

const entities = times(concurrency, () =>
createTestEntity({ _type: 'test' }),
);
await store.addEntities(uuid(), entities);

await store.iterateEntities({ _type: 'test' }, task, { concurrency });

// All tasks should have started with 250ms of each other since concurrency is 5
for (const e1 of entities) {
for (const e2 of entities) {
expect(
Math.abs(taskStartTimes[e2._key] - taskStartTimes[e1._key]),
).toBeLessThan(1000);
}
}
});

test('tasks should not run with more than expected concurrency', async () => {
const concurrency = 1;

const taskStartTimes = {};

const task = async (entity: Entity) => {
taskStartTimes[entity._key] = Date.now();
await new Promise((resolve) => setTimeout(resolve, 100));
};
const store = new InMemoryGraphObjectStore();

const entities = times(2, () => createTestEntity({ _type: 'test' }));
await store.addEntities(uuid(), entities);

await store.iterateEntities({ _type: 'test' }, task, { concurrency });
expect(
Math.abs(
taskStartTimes[entities[0]._key] - taskStartTimes[entities[1]._key],
),
).toBeGreaterThan(99);
});

test('should not throw if iterating entity _type that does not exist', async () => {
const store = new InMemoryGraphObjectStore();
expect(await collectEntitiesByType(store, uuid())).toEqual([]);
Expand Down

0 comments on commit 26a8889

Please sign in to comment.