Skip to content

Commit

Permalink
allow concurrency option for iterateEntities
Browse files Browse the repository at this point in the history
  • Loading branch information
zemberdotnet committed Nov 29, 2023
1 parent 19bbe6a commit f747caf
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 4 deletions.
1 change: 1 addition & 0 deletions packages/integration-sdk-core/src/types/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface GraphObjectStore {
iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
options?: { concurrency: number },
): Promise<void>;

iterateRelationships<T extends Relationship = Relationship>(
Expand Down
2 changes: 1 addition & 1 deletion packages/integration-sdk-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"globby": "^11.0.0",
"lodash": "^4.17.15",
"p-map": "^4.0.0",
"p-queue": "^6.3.0",
"p-queue": "^6.6.2",
"rimraf": "^3.0.2"
},
"devDependencies": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { readJsonFromPath, WalkDirectoryIterateeInput } from '../../fileSystem';

import { buildIndexDirectoryPath } from './path';
import { iterateParsedGraphFiles } from '../..';
import PQueue from 'p-queue';
import { onQueueSizeIsLessThanLimit } from '../queue';

interface BaseIterateCollectionIndexParams<GraphObject> {
type: string;
Expand All @@ -17,23 +19,34 @@ interface BaseIterateCollectionIndexParams<GraphObject> {
interface IterateCollectionIndexParams<GraphObject>
extends BaseIterateCollectionIndexParams<GraphObject> {
collectionType: 'entities' | 'relationships';
options?: { concurrency: number };
}

async function iterateCollectionTypeIndex<T extends Entity | Relationship>({
type,
collectionType,
iteratee,
options,
}: IterateCollectionIndexParams<T>) {
const path = buildIndexDirectoryPath({
collectionType,
type,
});

const queue = new PQueue({});
const concurrency = options?.concurrency ?? 1;

await iterateParsedGraphFiles(async (data) => {
for (const graphObj of (data[collectionType] as T[]) || []) {
await iteratee(graphObj);
// We mark this as void because we want to fire the task away and not wait for it to resolve
// that is handled by the combination of onQueueSizeIsLessThanLimit and onIdle
void queue.add(() => iteratee(graphObj));
await onQueueSizeIsLessThanLimit(queue, concurrency);
}
}, path);

// Wait for all tasks to complete
await queue.onIdle();
}

export async function iterateEntityTypeIndex<T extends Entity = Entity>({
Expand Down
15 changes: 14 additions & 1 deletion packages/integration-sdk-runtime/src/storage/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
Relationship,
} from '@jupiterone/integration-sdk-core';
import { getSizeOfObject } from '../synchronization/batchBySize';
import PQueue from 'p-queue';
import { onQueueSizeIsLessThanLimit } from './queue';

export interface GraphObjectMetadata {
stepId: string;
Expand Down Expand Up @@ -145,8 +147,11 @@ export class InMemoryGraphObjectStore implements GraphObjectStore {
async iterateEntities<T extends Entity = Entity>(
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
options?: { concurrency: number },
): Promise<void> {
const entityTypeKeysMap = this.entityTypeToKeysMap.get(filter._type);
const concurrency = options?.concurrency ?? 1;
const queue = new PQueue({ concurrency });

if (!entityTypeKeysMap) {
return;
Expand All @@ -163,8 +168,16 @@ export class InMemoryGraphObjectStore implements GraphObjectStore {
);
}

await iteratee(graphObjectData.entity as T);
// We mark this as void because we want to fire the task away and not wait for it to resolve
// that is handled by the combination of onQueueSizeIsLessThanLimit and onIdle
void queue.add(async () => iteratee(graphObjectData.entity as T));

Check failure on line 173 in packages/integration-sdk-runtime/src/storage/memory.ts

View workflow job for this annotation

GitHub Actions / test (18.x, ubuntu-latest)

Async arrow function has no 'await' expression

Check failure on line 173 in packages/integration-sdk-runtime/src/storage/memory.ts

View workflow job for this annotation

GitHub Actions / test (18.x, ubuntu-latest)

Async arrow function has no 'await' expression
// Don't flood the queue with promises. If we get to twice our concurrency we wait
// This is queued tasks, not tasks that running we could have up to
// concurrency-tasks (running) + concurrency-tasks (queued)
await onQueueSizeIsLessThanLimit(queue, concurrency);
}
// wait for everything to finish
await queue.onIdle();
}

async iterateRelationships<T extends Relationship = Relationship>(
Expand Down
32 changes: 32 additions & 0 deletions packages/integration-sdk-runtime/src/storage/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import PQueue from 'p-queue';

// Copied from: https://github.com/sindresorhus/p-queue/pull/131/files
// and backported to the commonjs compatible version.
//
// MIT License

// Copyright (c) Sindre Sorhus <[email protected]> (https://sindresorhus.com)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
export async function onQueueSizeIsLessThanLimit(
queue: PQueue,
limit: number,
): Promise<void> {
if (queue.size < limit) {
return;
}

return new Promise<void>((resolve) => {
const listener = () => {
if (queue.size < limit) {
queue.removeListener('next', listener);
resolve();
}
};

queue.on('next', listener);
});
}
2 changes: 1 addition & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8594,7 +8594,7 @@ [email protected]:
resolved "https://registry.yarnpkg.com/p-pipe/-/p-pipe-3.1.0.tgz#48b57c922aa2e1af6a6404cb7c6bf0eb9cc8e60e"
integrity sha512-08pj8ATpzMR0Y80x50yJHn37NF6vjrqHutASaX5LiH5npS9XPvrUmscd9MF5R4fuYRHOxQR1FfMIlF7AzwoPqw==

[email protected], p-queue@^6.3.0:
[email protected], p-queue@^6.6.2:
version "6.6.2"
resolved "https://registry.yarnpkg.com/p-queue/-/p-queue-6.6.2.tgz#2068a9dcf8e67dd0ec3e7a2bcb76810faa85e426"
integrity sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==
Expand Down

0 comments on commit f747caf

Please sign in to comment.