diff --git a/.editorconfig b/.editorconfig index e7c5e669..e73580b3 100644 --- a/.editorconfig +++ b/.editorconfig @@ -9,3 +9,4 @@ end_of_line = lf [*.{js,json,ts}] indent_style = space indent_size = 2 +quote_type = single diff --git a/src/sync/polling/syncTasks/segmentsSyncTask.ts b/src/sync/polling/syncTasks/segmentsSyncTask.ts index f5a93711..9205c11e 100644 --- a/src/sync/polling/syncTasks/segmentsSyncTask.ts +++ b/src/sync/polling/syncTasks/segmentsSyncTask.ts @@ -14,7 +14,7 @@ export function segmentsSyncTaskFactory( fetchSegmentChanges: IFetchSegmentChanges, storage: IStorageSync, readiness: IReadinessManager, - settings: ISettings, + settings: ISettings ): ISegmentsSyncTask { return syncTaskFactory( settings.log, @@ -22,7 +22,8 @@ export function segmentsSyncTaskFactory( settings.log, segmentChangesFetcherFactory(fetchSegmentChanges), storage.segments, - readiness, + settings.sync.numConcurrentSegmentFetches, + readiness ), settings.scheduler.segmentsRefreshRate, 'segmentChangesUpdater' diff --git a/src/sync/polling/updaters/segmentChangesUpdater.ts b/src/sync/polling/updaters/segmentChangesUpdater.ts index 32f3fe7f..6f37a032 100644 --- a/src/sync/polling/updaters/segmentChangesUpdater.ts +++ b/src/sync/polling/updaters/segmentChangesUpdater.ts @@ -5,10 +5,18 @@ import { MaybeThenable } from '../../../dtos/types'; import { findIndex } from '../../../utils/lang'; import { SDK_SEGMENTS_ARRIVED } from '../../../readiness/constants'; import { ILogger } from '../../../logger/types'; -import { LOG_PREFIX_INSTANTIATION, LOG_PREFIX_SYNC_SEGMENTS } from '../../../logger/constants'; +import { + LOG_PREFIX_INSTANTIATION, + LOG_PREFIX_SYNC_SEGMENTS, +} from '../../../logger/constants'; import { thenable } from '../../../utils/promise/thenable'; -type ISegmentChangesUpdater = (fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number) => Promise +type ISegmentChangesUpdater = ( + fetchOnlyNew?: boolean, + segmentName?: string, + noCache?: boolean, + till?: number +) => Promise; /** * Factory of SegmentChanges updater, a task that: @@ -25,37 +33,49 @@ export function segmentChangesUpdaterFactory( log: ILogger, segmentChangesFetcher: ISegmentChangesFetcher, segments: ISegmentsCacheBase, - readiness?: IReadinessManager, + numConcurrentSegmentFetches: number, + readiness?: IReadinessManager ): ISegmentChangesUpdater { - let readyOnAlreadyExistentState = true; - function updateSegment(segmentName: string, noCache?: boolean, till?: number, fetchOnlyNew?: boolean) { + async function updateSegment( + segmentName: string, + noCache?: boolean, + till?: number, + fetchOnlyNew?: boolean + ) { log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processing segment ${segmentName}`); - let sincePromise = Promise.resolve(segments.getChangeNumber(segmentName)); + const since = await segments.getChangeNumber(segmentName); - return sincePromise.then(since => { - // if fetchOnlyNew flag, avoid processing already fetched segments - if (fetchOnlyNew && since !== -1) return -1; + // if fetchOnlyNew flag, avoid processing already fetched segments + if (fetchOnlyNew && since !== -1) return -1; + const changes = await segmentChangesFetcher( + since, + segmentName, + noCache, + till + ); - return segmentChangesFetcher(since, segmentName, noCache, till).then(function (changes) { - let changeNumber = -1; - const results: MaybeThenable[] = []; - changes.forEach(x => { - if (x.added.length > 0) results.push(segments.addToSegment(segmentName, x.added)); - if (x.removed.length > 0) results.push(segments.removeFromSegment(segmentName, x.removed)); - if (x.added.length > 0 || x.removed.length > 0) { - results.push(segments.setChangeNumber(segmentName, x.till)); - changeNumber = x.till; - } + let changeNumber = -1; + const results: MaybeThenable[] = []; + changes.forEach((x) => { + if (x.added.length > 0) + results.push(segments.addToSegment(segmentName, x.added)); + if (x.removed.length > 0) + results.push(segments.removeFromSegment(segmentName, x.removed)); + if (x.added.length > 0 || x.removed.length > 0) { + results.push(segments.setChangeNumber(segmentName, x.till)); + changeNumber = x.till; + } - log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processed ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}`); - }); - // If at least one storage operation result is a promise, join all in a single promise. - if (results.some(result => thenable(result))) return Promise.all(results).then(() => changeNumber); - return changeNumber; - }); + log.debug( + `${LOG_PREFIX_SYNC_SEGMENTS}Processed ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}` + ); }); + // If at least one storage operation result is a promise, join all in a single promise. + if (results.some((result) => thenable(result))) + return Promise.all(results).then(() => changeNumber); + return changeNumber; } /** * Segments updater returns a promise that resolves with a `false` boolean value if it fails at least to fetch a segment or synchronize it with the storage. @@ -68,41 +88,64 @@ export function segmentChangesUpdaterFactory( * @param {boolean | undefined} noCache true to revalidate data to fetch on a SEGMENT_UPDATE notifications. * @param {number | undefined} till till target for the provided segmentName, for CDN bypass. */ - return function segmentChangesUpdater(fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number) { + return async function segmentChangesUpdater( + fetchOnlyNew?: boolean, + segmentName?: string, + noCache?: boolean, + till?: number + ) { log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Started segments update`); // If not a segment name provided, read list of available segments names to be updated. - let segmentsPromise = Promise.resolve(segmentName ? [segmentName] : segments.getRegisteredSegments()); - - return segmentsPromise.then(segmentNames => { - // Async fetchers are collected here. - const updaters: Promise[] = []; + const segmentNames = await (segmentName + ? [segmentName] + : segments.getRegisteredSegments()); + try { + let shouldUpdateFlags: number[] = []; - for (let index = 0; index < segmentNames.length; index++) { - updaters.push(updateSegment(segmentNames[index], noCache, till, fetchOnlyNew)); + // chunk in order to avoid an unbounded amount of simultaneous segment fetch requests + const chunkSize = numConcurrentSegmentFetches; + for (let i = 0; i < segmentNames.length; i += chunkSize) { + const chunk = segmentNames.slice(i, i + chunkSize); + shouldUpdateFlags = shouldUpdateFlags.concat( + await Promise.all( + chunk.map((segmentName) => + updateSegment(segmentName, noCache, till, fetchOnlyNew) + ) + ) + ); } - return Promise.all(updaters).then(shouldUpdateFlags => { - // if at least one segment fetch succeeded, mark segments ready - if (findIndex(shouldUpdateFlags, v => v !== -1) !== -1 || readyOnAlreadyExistentState) { - readyOnAlreadyExistentState = false; - if (readiness) readiness.segments.emit(SDK_SEGMENTS_ARRIVED); - } - return true; - }); - }) + // if at least one segment fetch succeeded, mark segments ready + if ( + findIndex(shouldUpdateFlags, (v) => v !== -1) !== -1 || + readyOnAlreadyExistentState + ) { + readyOnAlreadyExistentState = false; + if (readiness) readiness.segments.emit(SDK_SEGMENTS_ARRIVED); + } + return true; + } catch (error) { // Handles rejected promises at `segmentChangesFetcher`, `segments.getRegisteredSegments` and other segment storage operations. - .catch(error => { - if (error && error.statusCode === 403) { - // If the operation is forbidden, it may be due to permissions. Destroy the SDK instance. - // @TODO although factory status is destroyed, synchronization is not stopped - if (readiness) readiness.destroy(); - log.error(`${LOG_PREFIX_INSTANTIATION}: you passed a client-side type authorizationKey, please grab an SDK Key from the Split user interface that is of type server-side.`); - } else { - log.warn(`${LOG_PREFIX_SYNC_SEGMENTS}Error while doing fetch of segments. ${error}`); - } + if ( + error && + typeof error === 'object' && + 'statusCode' in error && + (error as any).statusCode === 403 + ) { + // If the operation is forbidden, it may be due to permissions. Destroy the SDK instance. + // @TODO although factory status is destroyed, synchronization is not stopped + if (readiness) readiness.destroy(); + log.error( + `${LOG_PREFIX_INSTANTIATION}: you passed a client-side type authorizationKey, please grab an SDK Key from the Split user interface that is of type server-side.` + ); + } else { + log.warn( + `${LOG_PREFIX_SYNC_SEGMENTS}Error while doing fetch of segments. ${error}` + ); + } - return false; - }); + return false; + } }; } diff --git a/src/types.ts b/src/types.ts index d868b92a..cd1310fe 100644 --- a/src/types.ts +++ b/src/types.ts @@ -118,7 +118,8 @@ export interface ISettings { impressionsMode: SplitIO.ImpressionsMode, __splitFiltersValidation: ISplitFiltersValidation, localhostMode?: SplitIO.LocalhostFactory, - enabled: boolean + enabled: boolean, + numConcurrentSegmentFetches: number }, readonly runtime: { ip: string | false diff --git a/src/utils/settingsValidation/__tests__/index.spec.ts b/src/utils/settingsValidation/__tests__/index.spec.ts index 8e0238c4..1f865d6a 100644 --- a/src/utils/settingsValidation/__tests__/index.spec.ts +++ b/src/utils/settingsValidation/__tests__/index.spec.ts @@ -59,6 +59,24 @@ describe('settingsValidation', () => { expect(settings.scheduler.impressionsRefreshRate).toBe(10000); }); + test('numConcurrentSegmentFetches should be configurable', () => { + const config = { + core: { authorizationKey: 'dummy token' }, + sync: { numConcurrentSegmentFetches: 20 }, + }; + let settings = settingsValidation(config, minimalSettingsParams); + + expect(settings.sync.numConcurrentSegmentFetches).toEqual(20); + + + settings = settingsValidation( + { ...config, sync: {numConcurrentSegmentFetches: 'asdf'} }, + minimalSettingsParams + ); + + expect(settings.sync.numConcurrentSegmentFetches).toEqual(10); + }); + test('impressionsMode should be configurable', () => { const config = { core: { authorizationKey: 'dummy token' }, diff --git a/src/utils/settingsValidation/__tests__/settings.mocks.ts b/src/utils/settingsValidation/__tests__/settings.mocks.ts index ad8d812a..8096687a 100644 --- a/src/utils/settingsValidation/__tests__/settings.mocks.ts +++ b/src/utils/settingsValidation/__tests__/settings.mocks.ts @@ -79,7 +79,8 @@ export const fullSettings: ISettings = { queryString: null, groupedFilters: { byName: [], byPrefix: [] } }, - enabled: true + enabled: true, + numConcurrentSegmentFetches: 10 }, version: 'jest', runtime: { diff --git a/src/utils/settingsValidation/index.ts b/src/utils/settingsValidation/index.ts index 700ada43..8f2faf3c 100644 --- a/src/utils/settingsValidation/index.ts +++ b/src/utils/settingsValidation/index.ts @@ -84,7 +84,8 @@ export const base = { // impressions collection mode impressionsMode: OPTIMIZED, localhostMode: undefined, - enabled: true + enabled: true, + numConcurrentSegmentFetches: 10 }, // Logger @@ -201,6 +202,11 @@ export function settingsValidation(config: unknown, validationParams: ISettingsV withDefaults.sync.enabled = true; } + // validate numConcurrentSegmentFetches is numeric and greater than 0 + if (!Number.isFinite(withDefaults.sync.numConcurrentSegmentFetches) || withDefaults.sync.numConcurrentSegmentFetches < 1) { + withDefaults.sync.numConcurrentSegmentFetches = base.sync.numConcurrentSegmentFetches; + } + // validate the `splitFilters` settings and parse splits query const splitFiltersValidation = validateSplitFilters(log, withDefaults.sync.splitFilters, withDefaults.mode); withDefaults.sync.splitFilters = splitFiltersValidation.validFilters;