Skip to content

Commit

Permalink
Rate limit in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ackava committed Oct 5, 2023
1 parent 4345428 commit c058fb1
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 2 deletions.
81 changes: 81 additions & 0 deletions src/models/Queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
export default class Queue<T> {

public get size() {
return this.length;
}

public get peek() {
if(!this.length) {
return void 0;
}
return this.store[this.start];
}

private store = new Array(4) as T[];

private start = 0;
private length = 0;

private get nextIndex() {
let index = this.start + this.length;
if (index >= this.store.length) {
index = (this.start + this.length) - this.store.length;
}
return index;
}

public enqueue(item: T) {
if (this.length === this.store.length) {
this.resize(this.store.length*2);
}
this.store[this.nextIndex] = item;
this.length++;
}

public dequeue() {
if(!this.length) {
return void 0;
}
const item = this.store[this.start];
this.start++;
if (this.start === this.store.length) {
this.start = 0;
}
this.length--;
return item;
}

private resize(n: number) {
const old = this.store;
this.store = new Array(n);
if (!this.length) {
this.start = 0;
return;
}
if (this.start > 0) {

const afterStart = Math.min(
this.start + this.length,
old.length - this.start
);

const afterStartItems = old.slice(
this.start);

// after start
this.store.splice(0,0, ... afterStartItems);

// before start
this.store.splice(afterStart, 0, ... old.slice(
0,
old.length - this.start
));
this.start = 0;
return;
}
if (this.length > 0) {
this.store.push(... old);
}
}

}
57 changes: 57 additions & 0 deletions src/models/TaskManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import Queue from "./Queue";

export default class TaskManager {

public rateLimit = 10;

private running: Set<any> = new Set();

private waiting: Queue<{ resolve, reject, fx }> = new Queue();

protected run<TR>(fx: (... a: any[]) => Promise<TR>): Promise<TR> {

const pr = new Promise((resolve, reject) => {
this.waiting.enqueue({ resolve, reject, fx });
});

this.processQueue();

return pr as Promise<TR>;
}

protected processQueue() {
for(;;) {
if (this.running.size >= this.rateLimit) {
return;
}

if (!this.waiting.size) {
return;
}

const t = this.waiting.dequeue();
if (!t) {
return;
}

const { fx, resolve, reject } = t;

this.running.add(fx);

fx().then(
(r) => {
this.running.delete(fx);
setTimeout(this.processQueue, 1, this);
resolve(r);
},
(e) => {
this.running.delete(fx);
setTimeout(this.processQueue, 1, this);
reject(e);
}
);

}
}

}
16 changes: 14 additions & 2 deletions src/services/HttpSession.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { CancelToken } from "@web-atoms/core/dist/core/types";
import JsonError from "@web-atoms/core/dist/services/http/JsonError";
import Queue from "../models/Queue";
import TaskManager from "../models/TaskManager";

export type URIWithSearchParams = [string, {[k: string]: any}];

Expand Down Expand Up @@ -43,11 +45,21 @@ export interface IHttpRequest extends RequestInit {
hideActivityIndicator?: boolean;
}

export default class HttpSession {
export default class HttpSession extends TaskManager {

/**
* Allow maximum parallel fetch...
*/
public rateLimit: number = 10;


protected resultConverter = (e) => e;

protected async fetchJson<T>(options: IHttpRequest): Promise<T> {
protected fetchJson<T>(options: IHttpRequest): Promise<T> {
return this.run(() => this.uncheckedFetchJson<T>(options));
}

protected async uncheckedFetchJson<T>(options: IHttpRequest): Promise<T> {
if (options.cancelToken) {
const ab = new AbortController();
options.signal = ab.signal;
Expand Down
49 changes: 49 additions & 0 deletions src/tests/task-manager/TaskManagerTests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import Test from "@web-atoms/unit-test/dist/Test";
import TestItem from "@web-atoms/unit-test/dist/TestItem";
import TaskManager from "../../models/TaskManager";
import Assert from "@web-atoms/unit-test/dist/Assert";

const sleep = (n) => new Promise((r) => setTimeout(r, n));

export class TestTaskManager extends TaskManager {

public sleep(fx: () => any) {
return this.run(async () => {
fx();
await sleep(100);
})
}
}

export default class TaskManagerTests extends TestItem {

@Test
public async test() {

let n = 1;

const tm = new TestTaskManager();
tm.rateLimit = 2;

const tasks = [];

for (let index = 0; index < 20; index++) {
tasks.push(tm.sleep(() => {
n++;
console.log([index, n]);
}));
}

await sleep(50);

Assert.equals(3, n);

await sleep(150);

Assert.equals(5, n);

await Promise.all(tasks);

Assert.equals(21, n);
}
}

0 comments on commit c058fb1

Please sign in to comment.