Skip to content

Commit

Permalink
Merge pull request #869 from storyblok/fix/PRO-803-throttle-fix
Browse files Browse the repository at this point in the history
fix: update throttle
  • Loading branch information
alexjoverm authored Nov 19, 2024
2 parents 8bdd0ab + 1b83ebb commit 59be18a
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 171 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ jobs:
- name: Install dependencies
run: pnpm install
- name: Run Lint
run: pnpm run lint
run: pnpm run lint
43 changes: 26 additions & 17 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import type {
ISbCustomFetch,
ISbLinkURLObject,
ISbNode,
ISbResponse,
ISbResponseData,
ISbResult,
ISbStories,
ISbStoriesParams,
ISbStory,
ISbStoryData,
ISbStoryParams,
ThrottleFn,
} from './interfaces';

let memory: Partial<IMemoryType> = {};
Expand All @@ -47,15 +48,6 @@ interface ISbFlatMapped {
data: any;
}

export interface ISbResponseData {
link_uuids: string[];
links: string[];
rel_uuids: string[];
rels: any;
story: ISbStoryData;
stories: Array<ISbStoryData>;
}

const _VERSION = {
V1: 'v1',
V2: 'v2',
Expand All @@ -68,7 +60,7 @@ class Storyblok {
private client: SbFetch;
private maxRetries: number;
private retriesDelay: number;
private throttle: ThrottleFn;
private throttle: ReturnType<typeof throttledQueue>;
private accessToken: string;
private cache: ISbCache;
private helpers: SbHelpers;
Expand Down Expand Up @@ -148,7 +140,12 @@ class Storyblok {

this.maxRetries = config.maxRetries || 10;
this.retriesDelay = 300;
this.throttle = throttledQueue(this.throttledRequest, rateLimit, 1000);
this.throttle = throttledQueue(
this.throttledRequest.bind(this),
rateLimit,
1000,
);

this.accessToken = config.accessToken || '';
this.relations = {} as RelationsType;
this.links = {} as LinksType;
Expand Down Expand Up @@ -281,7 +278,9 @@ class Storyblok {
): Promise<ISbResponseData> {
const url = `/${slug}`;

return Promise.resolve(this.throttle('post', url, params, fetchOptions));
return Promise.resolve(
this.throttle('post', url, params, fetchOptions),
) as Promise<ISbResponseData>;
}

public put(
Expand All @@ -291,7 +290,9 @@ class Storyblok {
): Promise<ISbResponseData> {
const url = `/${slug}`;

return Promise.resolve(this.throttle('put', url, params, fetchOptions));
return Promise.resolve(
this.throttle('put', url, params, fetchOptions),
) as Promise<ISbResponseData>;
}

public delete(
Expand All @@ -301,7 +302,9 @@ class Storyblok {
): Promise<ISbResponseData> {
const url = `/${slug}`;

return Promise.resolve(this.throttle('delete', url, params, fetchOptions));
return Promise.resolve(
this.throttle('delete', url, params, fetchOptions),
) as Promise<ISbResponseData>;
}

public getStories(
Expand Down Expand Up @@ -616,7 +619,12 @@ class Storyblok {

return new Promise(async (resolve, reject) => {
try {
const res = await this.throttle('get', url, params, fetchOptions);
const res = (await this.throttle(
'get',
url,
params,
fetchOptions,
)) as ISbResponse;
if (res.status !== 200) {
return reject(res);
}
Expand All @@ -635,7 +643,8 @@ class Storyblok {
}

if (response.data.story || response.data.stories) {
const resolveId = (this.resolveCounter = ++this.resolveCounter % 1000);
const resolveId = (this.resolveCounter
= ++this.resolveCounter % 1000);
await this.resolveStories(response.data, params, `${resolveId}`);
}

Expand Down
31 changes: 29 additions & 2 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ResponseFn } from './sbFetch';
import type Method from './constants';

export interface ISbStoriesParams
extends Partial<ISbStoryData>,
Expand Down Expand Up @@ -241,6 +242,7 @@ export interface ISbResponse {
data: any;
status: number;
statusText: string;
headers: any;
}

export interface ISbError {
Expand Down Expand Up @@ -339,10 +341,35 @@ export interface ISbLinks {
};
}

export interface ThrottleFn {
(...args: any): any;
export interface Queue<T> {
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
args: T;
}

export interface ISbResponseData {
link_uuids: string[];
links: string[];
rel_uuids: string[];
rels: any;
story: ISbStoryData;
stories: Array<ISbStoryData>;
}

export interface ISbThrottle<
T extends (...args: Parameters<T>) => ReturnType<T>,
> {
abort?: () => void;
(...args: Parameters<T>): Promise<unknown>;
}

export type ISbThrottledRequest = (
type: Method,
url: string,
params: ISbStoriesParams,
fetchOptions?: ISbCustomFetch
) => Promise<unknown>;

export type AsyncFn = (...args: any) => [] | Promise<ISbResult>;

export type ArrayFn = (...args: any) => void;
Expand Down
48 changes: 48 additions & 0 deletions src/throttlePromise.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { describe, expect, it, vi } from 'vitest';
import throttledQueue from './throttlePromise';

// Mock function to simulate async work with a delay
const mockFn = vi.fn(async (input) => {
await new Promise(resolve => setTimeout(resolve, 200)); // Simulate async delay
return input;
});

describe('throttledQueue', () => {
it('should resolve or reject all promises after the queue finishes, even when aborting', async () => {
const throttled = throttledQueue(mockFn, 3, 10); // Throttle with 3 concurrent tasks
const promises: Promise<any>[] = [];

// Generate 10 tasks and push them to the promises array
for (let i = 0; i < 10; i++) {
promises.push(throttled(i));
if (i === 5) {
throttled.abort(); // but abort at call #6
}
}

const results = await Promise.allSettled(promises);
results.forEach((result) => {
expect(['fulfilled', 'rejected']).toContain(result.status);
});
});
it('should enforce sequential resolution when throttle limit is exceeded', async () => {
const throttled = throttledQueue(mockFn, 1, 100); // Limit of 1, 100ms interval

const start = Date.now();
const promises = [
throttled('test1'),
throttled('test2'),
throttled('test3'),
];

const results = await Promise.all(promises);
const duration = Date.now() - start;

// Expected behavior:
// Since each call has a 200ms delay, and there's a 100ms throttle interval and limit is 1,
// and each successive call should only start after the previous one completes,
// then the total duration should be around 800ms (200*3 + 100*2).
expect(results).toEqual(['test1', 'test2', 'test3']);
expect(duration).toBeGreaterThanOrEqual(800);
});
});
75 changes: 34 additions & 41 deletions src/throttlePromise.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
import type { ThrottleFn } from './interfaces';
import type { ISbThrottle, Queue } from './interfaces';

interface Shifted {
args: any;
self: any;
resolve: (args: any) => any;
}

interface Queue {
resolve: (args: any) => any;
reject: (args: any) => any;
args: any[];
self: any;
}

interface ISbThrottle {
abort: () => any;
(args: []): Promise<Queue>;
name: string;
AbortError?: () => void;
class AbortError extends Error {
constructor(msg: string) {
super(msg);
this.name = 'AbortError';
}
}

function throttledQueue(fn: ThrottleFn, limit: number, interval: number) {
function throttledQueue<T extends (...args: Parameters<T>) => ReturnType<T>>(
fn: T,
limit: number,
interval: number,
): ISbThrottle<T> {
if (!Number.isFinite(limit)) {
throw new TypeError('Expected `limit` to be a finite number');
}
Expand All @@ -29,45 +20,49 @@ function throttledQueue(fn: ThrottleFn, limit: number, interval: number) {
throw new TypeError('Expected `interval` to be a finite number');
}

const queue: Queue[] = [];
const queue: Queue<Parameters<T>>[] = [];
let timeouts: ReturnType<typeof setTimeout>[] = [];
let activeCount = 0;
let isAborted = false;

const next = function () {
const next = async () => {
activeCount++;

const x = queue.shift();
if (x) {
const res = await fn(...x.args);
x.resolve(res);
}

const id = setTimeout(() => {
activeCount--;

if (queue.length > 0) {
next();
}

timeouts = timeouts.filter((currentId) => {
return currentId !== id;
});
timeouts = timeouts.filter(currentId => currentId !== id);
}, interval);

if (!timeouts.includes(id)) {
timeouts.push(id);
}

const x = queue.shift() as unknown as Shifted;
x.resolve(fn.apply(x.self, x.args));
};

const throttled: ISbThrottle = function (
this: ISbThrottle,
...args: []
): Promise<Queue> {
const self = this;
const throttled: ISbThrottle<T> = (...args) => {
if (isAborted) {
return Promise.reject(
new Error(
'Throttled function is already aborted and not accepting new promises',
),
);
}

return new Promise((resolve, reject) => {
queue.push({
resolve,
reject,
args,
self,
});

if (activeCount < limit) {
Expand All @@ -76,16 +71,14 @@ function throttledQueue(fn: ThrottleFn, limit: number, interval: number) {
});
};

throttled.abort = function () {
throttled.abort = () => {
isAborted = true;
timeouts.forEach(clearTimeout);
timeouts = [];

queue.forEach((x) => {
x.reject(function (this: ISbThrottle) {
Error.call(this, 'Throttled function aborted');
this.name = 'AbortError';
});
});
queue.forEach(x =>
x.reject(() => new AbortError('Throttle function aborted')),
);
queue.length = 0;
};

Expand Down
Loading

0 comments on commit 59be18a

Please sign in to comment.