Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

segmentChangesUpdater - limit concurrency #254

Open
wants to merge 8 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ end_of_line = lf
[*.{js,json,ts}]
indent_style = space
indent_size = 2
quote_type = single
5 changes: 3 additions & 2 deletions src/sync/polling/syncTasks/segmentsSyncTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ export function segmentsSyncTaskFactory(
fetchSegmentChanges: IFetchSegmentChanges,
storage: IStorageSync,
readiness: IReadinessManager,
settings: ISettings,
settings: ISettings
): ISegmentsSyncTask {
return syncTaskFactory(
settings.log,
segmentChangesUpdaterFactory(
settings.log,
segmentChangesFetcherFactory(fetchSegmentChanges),
storage.segments,
readiness,
settings.sync.numConcurrentSegmentFetches,
readiness
),
settings.scheduler.segmentsRefreshRate,
'segmentChangesUpdater'
Expand Down
149 changes: 96 additions & 53 deletions src/sync/polling/updaters/segmentChangesUpdater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@ import { MaybeThenable } from '../../../dtos/types';
import { findIndex } from '../../../utils/lang';
import { SDK_SEGMENTS_ARRIVED } from '../../../readiness/constants';
import { ILogger } from '../../../logger/types';
import { LOG_PREFIX_INSTANTIATION, LOG_PREFIX_SYNC_SEGMENTS } from '../../../logger/constants';
import {
LOG_PREFIX_INSTANTIATION,
LOG_PREFIX_SYNC_SEGMENTS,
} from '../../../logger/constants';
import { thenable } from '../../../utils/promise/thenable';

type ISegmentChangesUpdater = (fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number) => Promise<boolean>
type ISegmentChangesUpdater = (
fetchOnlyNew?: boolean,
segmentName?: string,
noCache?: boolean,
till?: number
) => Promise<boolean>;

/**
* Factory of SegmentChanges updater, a task that:
Expand All @@ -25,37 +33,49 @@ export function segmentChangesUpdaterFactory(
log: ILogger,
segmentChangesFetcher: ISegmentChangesFetcher,
segments: ISegmentsCacheBase,
readiness?: IReadinessManager,
numConcurrentSegmentFetches: number,
readiness?: IReadinessManager
): ISegmentChangesUpdater {

let readyOnAlreadyExistentState = true;

function updateSegment(segmentName: string, noCache?: boolean, till?: number, fetchOnlyNew?: boolean) {
async function updateSegment(
segmentName: string,
noCache?: boolean,
till?: number,
fetchOnlyNew?: boolean
) {
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processing segment ${segmentName}`);
let sincePromise = Promise.resolve(segments.getChangeNumber(segmentName));
const since = await segments.getChangeNumber(segmentName);

return sincePromise.then(since => {
// if fetchOnlyNew flag, avoid processing already fetched segments
if (fetchOnlyNew && since !== -1) return -1;
// if fetchOnlyNew flag, avoid processing already fetched segments
if (fetchOnlyNew && since !== -1) return -1;
const changes = await segmentChangesFetcher(
since,
segmentName,
noCache,
till
);

return segmentChangesFetcher(since, segmentName, noCache, till).then(function (changes) {
let changeNumber = -1;
const results: MaybeThenable<boolean | void>[] = [];
changes.forEach(x => {
if (x.added.length > 0) results.push(segments.addToSegment(segmentName, x.added));
if (x.removed.length > 0) results.push(segments.removeFromSegment(segmentName, x.removed));
if (x.added.length > 0 || x.removed.length > 0) {
results.push(segments.setChangeNumber(segmentName, x.till));
changeNumber = x.till;
}
let changeNumber = -1;
const results: MaybeThenable<boolean | void>[] = [];
changes.forEach((x) => {
if (x.added.length > 0)
results.push(segments.addToSegment(segmentName, x.added));
if (x.removed.length > 0)
results.push(segments.removeFromSegment(segmentName, x.removed));
if (x.added.length > 0 || x.removed.length > 0) {
results.push(segments.setChangeNumber(segmentName, x.till));
changeNumber = x.till;
}

log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processed ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}`);
});
// If at least one storage operation result is a promise, join all in a single promise.
if (results.some(result => thenable(result))) return Promise.all(results).then(() => changeNumber);
return changeNumber;
});
log.debug(
`${LOG_PREFIX_SYNC_SEGMENTS}Processed ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}`
);
});
// If at least one storage operation result is a promise, join all in a single promise.
if (results.some((result) => thenable(result)))
return Promise.all(results).then(() => changeNumber);
return changeNumber;
}
/**
* Segments updater returns a promise that resolves with a `false` boolean value if it fails at least to fetch a segment or synchronize it with the storage.
Expand All @@ -68,41 +88,64 @@ export function segmentChangesUpdaterFactory(
* @param {boolean | undefined} noCache true to revalidate data to fetch on a SEGMENT_UPDATE notifications.
* @param {number | undefined} till till target for the provided segmentName, for CDN bypass.
*/
return function segmentChangesUpdater(fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number) {
return async function segmentChangesUpdater(
fetchOnlyNew?: boolean,
segmentName?: string,
noCache?: boolean,
till?: number
) {
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Started segments update`);

// If not a segment name provided, read list of available segments names to be updated.
let segmentsPromise = Promise.resolve(segmentName ? [segmentName] : segments.getRegisteredSegments());

return segmentsPromise.then(segmentNames => {
// Async fetchers are collected here.
const updaters: Promise<number>[] = [];
const segmentNames = await (segmentName
? [segmentName]
: segments.getRegisteredSegments());
try {
let shouldUpdateFlags: number[] = [];

for (let index = 0; index < segmentNames.length; index++) {
updaters.push(updateSegment(segmentNames[index], noCache, till, fetchOnlyNew));
// chunk in order to avoid an unbounded amount of simultaneous segment fetch requests
const chunkSize = numConcurrentSegmentFetches;
for (let i = 0; i < segmentNames.length; i += chunkSize) {
const chunk = segmentNames.slice(i, i + chunkSize);
shouldUpdateFlags = shouldUpdateFlags.concat(
await Promise.all(
chunk.map((segmentName) =>
updateSegment(segmentName, noCache, till, fetchOnlyNew)
)
)
);
}

return Promise.all(updaters).then(shouldUpdateFlags => {
// if at least one segment fetch succeeded, mark segments ready
if (findIndex(shouldUpdateFlags, v => v !== -1) !== -1 || readyOnAlreadyExistentState) {
readyOnAlreadyExistentState = false;
if (readiness) readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
}
return true;
});
})
// if at least one segment fetch succeeded, mark segments ready
if (
findIndex(shouldUpdateFlags, (v) => v !== -1) !== -1 ||
readyOnAlreadyExistentState
) {
readyOnAlreadyExistentState = false;
if (readiness) readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
}
return true;
} catch (error) {
// Handles rejected promises at `segmentChangesFetcher`, `segments.getRegisteredSegments` and other segment storage operations.
.catch(error => {
if (error && error.statusCode === 403) {
// If the operation is forbidden, it may be due to permissions. Destroy the SDK instance.
// @TODO although factory status is destroyed, synchronization is not stopped
if (readiness) readiness.destroy();
log.error(`${LOG_PREFIX_INSTANTIATION}: you passed a client-side type authorizationKey, please grab an SDK Key from the Split user interface that is of type server-side.`);
} else {
log.warn(`${LOG_PREFIX_SYNC_SEGMENTS}Error while doing fetch of segments. ${error}`);
}
if (
error &&
typeof error === 'object' &&
'statusCode' in error &&
(error as any).statusCode === 403
) {
// If the operation is forbidden, it may be due to permissions. Destroy the SDK instance.
// @TODO although factory status is destroyed, synchronization is not stopped
if (readiness) readiness.destroy();
log.error(
`${LOG_PREFIX_INSTANTIATION}: you passed a client-side type authorizationKey, please grab an SDK Key from the Split user interface that is of type server-side.`
);
} else {
log.warn(
`${LOG_PREFIX_SYNC_SEGMENTS}Error while doing fetch of segments. ${error}`
);
}

return false;
});
return false;
}
};
}
3 changes: 2 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ export interface ISettings {
impressionsMode: SplitIO.ImpressionsMode,
__splitFiltersValidation: ISplitFiltersValidation,
localhostMode?: SplitIO.LocalhostFactory,
enabled: boolean
enabled: boolean,
numConcurrentSegmentFetches: number
},
readonly runtime: {
ip: string | false
Expand Down
18 changes: 18 additions & 0 deletions src/utils/settingsValidation/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ describe('settingsValidation', () => {
expect(settings.scheduler.impressionsRefreshRate).toBe(10000);
});

test('numConcurrentSegmentFetches should be configurable', () => {
const config = {
core: { authorizationKey: 'dummy token' },
sync: { numConcurrentSegmentFetches: 20 },
};
let settings = settingsValidation(config, minimalSettingsParams);

expect(settings.sync.numConcurrentSegmentFetches).toEqual(20);


settings = settingsValidation(
{ ...config, sync: {numConcurrentSegmentFetches: 'asdf'} },
minimalSettingsParams
);

expect(settings.sync.numConcurrentSegmentFetches).toEqual(10);
});

test('impressionsMode should be configurable', () => {
const config = {
core: { authorizationKey: 'dummy token' },
Expand Down
3 changes: 2 additions & 1 deletion src/utils/settingsValidation/__tests__/settings.mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ export const fullSettings: ISettings = {
queryString: null,
groupedFilters: { byName: [], byPrefix: [] }
},
enabled: true
enabled: true,
numConcurrentSegmentFetches: 10
},
version: 'jest',
runtime: {
Expand Down
8 changes: 7 additions & 1 deletion src/utils/settingsValidation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ export const base = {
// impressions collection mode
impressionsMode: OPTIMIZED,
localhostMode: undefined,
enabled: true
enabled: true,
numConcurrentSegmentFetches: 10
},

// Logger
Expand Down Expand Up @@ -201,6 +202,11 @@ export function settingsValidation(config: unknown, validationParams: ISettingsV
withDefaults.sync.enabled = true;
}

// validate numConcurrentSegmentFetches is numeric and greater than 0
if (!Number.isFinite(withDefaults.sync.numConcurrentSegmentFetches) || withDefaults.sync.numConcurrentSegmentFetches < 1) {
withDefaults.sync.numConcurrentSegmentFetches = base.sync.numConcurrentSegmentFetches;
}

// validate the `splitFilters` settings and parse splits query
const splitFiltersValidation = validateSplitFilters(log, withDefaults.sync.splitFilters, withDefaults.mode);
withDefaults.sync.splitFilters = splitFiltersValidation.validFilters;
Expand Down