From 26a88891ba703b73dab0331b0f3b679835100f03 Mon Sep 17 00:00:00 2001 From: Matthew Zember Date: Thu, 30 Nov 2023 12:26:56 -0500 Subject: [PATCH] update types; add tests for concurrency --- .../src/types/jobState.ts | 1 + .../src/execution/jobState.ts | 4 +- .../FileSystemGraphObjectStore.ts | 4 +- .../FileSystemGraphObjectStore.test.ts | 37 ++++++++++++ .../FileSystemGraphObjectStore/indices.ts | 4 +- .../src/storage/memory.test.ts | 56 +++++++++++++++++++ 6 files changed, 102 insertions(+), 4 deletions(-) diff --git a/packages/integration-sdk-core/src/types/jobState.ts b/packages/integration-sdk-core/src/types/jobState.ts index a1e346f19..f9debbb4b 100644 --- a/packages/integration-sdk-core/src/types/jobState.ts +++ b/packages/integration-sdk-core/src/types/jobState.ts @@ -104,6 +104,7 @@ export interface JobState { iterateEntities: ( filter: GraphObjectFilter, iteratee: GraphObjectIteratee, + options?: { concurrency: number }, ) => Promise; /** diff --git a/packages/integration-sdk-runtime/src/execution/jobState.ts b/packages/integration-sdk-runtime/src/execution/jobState.ts index 49281dca0..a286f1f64 100644 --- a/packages/integration-sdk-runtime/src/execution/jobState.ts +++ b/packages/integration-sdk-runtime/src/execution/jobState.ts @@ -278,8 +278,8 @@ export function createStepJobState({ return duplicateKeyTracker.hasKey(_key); }, - iterateEntities: (filter, iteratee) => - graphObjectStore.iterateEntities(filter, iteratee), + iterateEntities: (filter, iteratee, options) => + graphObjectStore.iterateEntities(filter, iteratee, options), iterateRelationships: (filter, iteratee) => graphObjectStore.iterateRelationships(filter, iteratee), diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index 1ab3156c3..0031e6236 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -211,12 +211,14 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { async iterateEntities( filter: GraphObjectFilter, iteratee: GraphObjectIteratee, + options: { concurrency: number }, ) { - await this.localGraphObjectStore.iterateEntities(filter, iteratee); + await this.localGraphObjectStore.iterateEntities(filter, iteratee, options); await iterateEntityTypeIndex({ type: filter._type, iteratee, + options, }); } diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts index d3b2ded1f..bf699c6fe 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts @@ -29,6 +29,7 @@ import { RelationshipClass } from '@jupiterone/data-model'; import { FlushedGraphObjectData } from '../../types'; import sortBy from 'lodash/sortBy'; import { getSizeOfObject } from '../../../synchronization/batchBySize'; +import { warn } from 'console'; jest.mock('fs'); @@ -576,6 +577,42 @@ describe('iterateEntities', () => { ]), ); }); + + test('should allow concurrency when iterating entities', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore(); + + const entityType = uuid(); + + type TestEntity = Entity & { randomField: string }; + + const entities = times(5, () => + createTestEntity({ _type: entityType, randomField: 'field' }), + ); + + await store.addEntities(storageDirectoryPath, entities); + + const taskBeginTimes = {}; + const collectedEntities: TestEntity[] = []; + const task = async (e: TestEntity) => { + taskBeginTimes[e._key] = Date.now(); + await new Promise((resolve) => setTimeout(resolve, 100)); + collectedEntities.push(e); + }; + + await store.iterateEntities({ _type: entityType }, task, { + concurrency: 5, + }); + + for (const e1 of entities) { + for (const e2 of entities) { + expect( + Math.abs(taskBeginTimes[e1._key] - taskBeginTimes[e2._key]), + ).toBeLessThan(100); + } + } + + expect(collectedEntities.length).toBe(5); + }); }); describe('iterateRelationships', () => { diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/indices.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/indices.ts index 232c9c006..d47311ab2 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/indices.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/indices.ts @@ -14,12 +14,12 @@ import { onQueueSizeIsLessThanLimit } from '../queue'; interface BaseIterateCollectionIndexParams { type: string; iteratee: GraphObjectIteratee; + options?: { concurrency: number }; } interface IterateCollectionIndexParams extends BaseIterateCollectionIndexParams { collectionType: 'entities' | 'relationships'; - options?: { concurrency: number }; } async function iterateCollectionTypeIndex({ @@ -52,11 +52,13 @@ async function iterateCollectionTypeIndex({ export async function iterateEntityTypeIndex({ type, iteratee, + options, }: BaseIterateCollectionIndexParams) { await iterateCollectionTypeIndex({ type, iteratee, collectionType: 'entities', + options, }); } diff --git a/packages/integration-sdk-runtime/src/storage/memory.test.ts b/packages/integration-sdk-runtime/src/storage/memory.test.ts index 8ac51ad0f..233a32148 100644 --- a/packages/integration-sdk-runtime/src/storage/memory.test.ts +++ b/packages/integration-sdk-runtime/src/storage/memory.test.ts @@ -5,6 +5,7 @@ import { createTestEntity, createTestRelationship, } from '@jupiterone/integration-sdk-private-test-utils'; +import { times } from 'lodash'; async function collectEntitiesByType( store: InMemoryGraphObjectStore, @@ -81,6 +82,61 @@ describe('#InMemoryGraphObjectStore', () => { expect(await collectEntitiesByType(store, e2._type)).toEqual([e2]); }); + test('tasks should run concurrently', async () => { + const concurrency = 5; + + const taskStartTimes = {}; + + // This tests the concurrency by making each task take _at least_ 250 ms. + // Then it compares the start time of each task to all the other tasks. + // Since concurrency is equal to the number of entities we are iterating + // all the tasks should start immediately and the difference in start time should be + // less than 250ms + const task = async (entity: Entity) => { + taskStartTimes[entity._key] = Date.now(); + await new Promise((resolve) => setTimeout(resolve, 250)); // Mock task delay + }; + const store = new InMemoryGraphObjectStore(); + + const entities = times(concurrency, () => + createTestEntity({ _type: 'test' }), + ); + await store.addEntities(uuid(), entities); + + await store.iterateEntities({ _type: 'test' }, task, { concurrency }); + + // All tasks should have started with 250ms of each other since concurrency is 5 + for (const e1 of entities) { + for (const e2 of entities) { + expect( + Math.abs(taskStartTimes[e2._key] - taskStartTimes[e1._key]), + ).toBeLessThan(1000); + } + } + }); + + test('tasks should not run with more than expected concurrency', async () => { + const concurrency = 1; + + const taskStartTimes = {}; + + const task = async (entity: Entity) => { + taskStartTimes[entity._key] = Date.now(); + await new Promise((resolve) => setTimeout(resolve, 100)); + }; + const store = new InMemoryGraphObjectStore(); + + const entities = times(2, () => createTestEntity({ _type: 'test' })); + await store.addEntities(uuid(), entities); + + await store.iterateEntities({ _type: 'test' }, task, { concurrency }); + expect( + Math.abs( + taskStartTimes[entities[0]._key] - taskStartTimes[entities[1]._key], + ), + ).toBeGreaterThan(99); + }); + test('should not throw if iterating entity _type that does not exist', async () => { const store = new InMemoryGraphObjectStore(); expect(await collectEntitiesByType(store, uuid())).toEqual([]);