Skip to content

Commit

Permalink
⚗️ remove excessive queueing
Browse files Browse the repository at this point in the history
  • Loading branch information
elmarti committed Jan 4, 2023
1 parent af5e1f2 commit e39e43f
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 94 deletions.
6 changes: 5 additions & 1 deletion demo/browser/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ <h1>Open the console for output</h1>
const buttons = document.querySelectorAll('.test');
for(const button of buttons){
button.addEventListener('click', async () => {
try {
console.time(button.dataset.type);
const database = new camadb.Cama({
path: './.cama',
Expand Down Expand Up @@ -110,7 +111,10 @@ <h1>Open the console for output</h1>
console.timeEnd('Aggregation');
console.log({aggregationResult});
console.timeEnd(button.dataset.type);

}catch(err){
console.log('failed');
console.error(err);
}
});
}
</script>
Expand Down
1 change: 0 additions & 1 deletion demo/collection.cama/test/data~

This file was deleted.

29 changes: 13 additions & 16 deletions demo/test-insert.js → demo/test-insert.mjs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@

require('reflect-metadata');
const { Cama } = require('../dist');
const fs = require('fs');
const path = require ('path');
import 'reflect-metadata';
import { Cama } from '../dist/index.js';
import fs from 'fs';
import path from 'path';
const outputPath = path.join(process.cwd(), 'collection.cama');

const outputPath = path.join(__dirname, 'collection.cama');

async function demo() {
try {
console.log('cleaning output');
fs.rmdirSync(outputPath, {
Expand Down Expand Up @@ -54,15 +52,15 @@ Sed pellentesque ante quis nunc accumsan sodales. Nam vitae dui a quam bibendum
}
console.timeEnd('dummy data generated');

await collection.insertOne({
const insertOneResult = await collection.insertOne({
_id: 'test',
name: 'Dummy field',
description: `Data`,
});
console.log('insert col1');
await collection.insertMany(dummyData);

await collection.findMany({
console.log({insertOneResult});
const insertManyResult = await collection.insertMany(dummyData);
console.log({insertManyResult});
const findManyResult = await collection.findMany({
_id: {
$gte: 50000,
},
Expand All @@ -74,6 +72,7 @@ Sed pellentesque ante quis nunc accumsan sodales. Nam vitae dui a quam bibendum
offset: 100,
limit: 100
});
console.log({findManyResult});
await collection.findMany({
_id: {
$gte: 50000,
Expand Down Expand Up @@ -114,8 +113,6 @@ Sed pellentesque ante quis nunc accumsan sodales. Nam vitae dui a quam bibendum
console.timeEnd('Aggregation');
console.log({aggregationResult})

}

console.time('demo');
demo()
.then(() => console.timeEnd('demo'))
.catch((err) => console.error(err));

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"lint": "eslint src --ext .js,.ts",
"lint:fix": "eslint src --fix --ext .js,.ts",
"test": "jest --config jest.config.js",
"demo": "yarn build && node ./demo/test-insert.js",
"demo": "yarn build && node ./demo/test-insert.mjs",
"generate-docs": "typedoc --out docs ./src",
"browser-demo": "yarn build && cd demo/browser && yarn && yarn start"
},
Expand Down
1 change: 1 addition & 0 deletions src/interfaces/queue-service.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

export interface IQueueService{
promise: Promise<any>;
add(task: any): any;
}
17 changes: 6 additions & 11 deletions src/modules/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class Collection implements ICollection {
private logger: ILogger;
private persistenceAdapter: IPersistenceAdapter;
private queryService: IQueryService<any>;
private queue: IQueueService;
public queue: IQueueService;
private destroyed = false;
private aggregator: IAggregator;

Expand Down Expand Up @@ -54,11 +54,11 @@ export class Collection implements ICollection {
async insertMany(rows:Array<any>):Promise<void> {
this.checkDestroyed();
this.logger.log(LogLevel.Debug, 'Inserting many');
await this.queue.add(() => (async (rows) => {
const pointer = this.logger.startTimer();

await this.persistenceAdapter.insert(rows);
this.logger.endTimer(LogLevel.Debug, pointer, "insert rows");
})(rows))

}

/**
Expand Down Expand Up @@ -89,12 +89,9 @@ export class Collection implements ICollection {
this.checkDestroyed();
this.logger.log(LogLevel.Debug, 'Finding many');
const pointer = this.logger.startTimer();
const result = await this.queue.add(() => (async (query, options) => {
return await this.queryService.filter(query, options);
})(query, options));
this.logger.endTimer(LogLevel.Debug, pointer, "find many");
const result = await this.queryService.filter(query, options);
this.logger.endTimer(LogLevel.Debug, pointer, "Finding many");
return result;

}

/**
Expand All @@ -106,9 +103,7 @@ export class Collection implements ICollection {
this.checkDestroyed();
this.logger.log(LogLevel.Debug, 'Updating many');
const pointer = this.logger.startTimer();
await this.queue.add(() => (async (query, delta) => {
await this.queryService.update(query, delta);
})(query, delta))
await this.queryService.update(query, delta);
this.logger.endTimer(LogLevel.Debug, pointer, "Updating many");

}
Expand Down
71 changes: 37 additions & 34 deletions src/modules/persistence/fs/collection-meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,37 @@ export class CollectionMeta implements ICollectionMeta {
@inject(TYPES.Logger) private logger:ILogger,
@inject(TYPES.System) private system: ISystem,
@inject(TYPES.QueueService) private queue: IQueueService) {
this.queue.add(async () => {
this.camaPath = this.system.getOutputPath();
this.dbPath = path.join(this.camaPath, collectionName);
this.fileName = `meta.json`;
this.collectionName = collectionName;
this.logger.log(LogLevel.Info, 'Ensuring cama folder exists: ' + this.camaPath);
if ((!await this.fs.exists(this.camaPath))) {
this.logger.log(LogLevel.Info, "Doesn't exist, creating: " + this.camaPath);

await this.fs.mkdir(path.join(this.camaPath));
}
this.logger.log(LogLevel.Info, 'Checking if folder exists for collection' + this.fileName);

if (await this.fs.exists(path.join(this.dbPath, this.fileName))) {
this.logger.log(LogLevel.Info, 'Already exists');
return;
}
this.logger.log(LogLevel.Info, 'Does not exist, creating' + this.fileName);

await this.fs.mkdir(path.join(this.dbPath));
this.meta = {
...collectionConfig,
collectionName,
};
this.logger.log(LogLevel.Info, 'Writing meta file');
await this.fs.writeData(this.camaPath, this.collectionName, []);
await this.fs.commit(this.camaPath, this.collectionName);
return this.fs.writeJSON<IMetaStructure>(this.dbPath, this.fileName, this.meta);
});
const initializeCollectionMetaTask = async () => {
this.camaPath = this.system.getOutputPath();
this.dbPath = path.join(this.system.getOutputPath(), collectionName);
this.fileName = `meta.json`;
this.collectionName = collectionName;
this.logger.log(LogLevel.Info, 'Ensuring cama folder exists: ' + this.camaPath);
if (!(await this.fs.exists(this.camaPath))) {
this.logger.log(LogLevel.Info, "Doesn't exist, creating: " + this.camaPath);

await this.fs.mkdir(path.join(this.camaPath));
}
this.logger.log(LogLevel.Info, 'Checking if folder exists for collection ' + this.fileName);

if (await this.fs.exists(path.join(this.dbPath, this.fileName))) {
this.logger.log(LogLevel.Info, 'Already exists');
return;
}
this.logger.log(LogLevel.Info, 'Does not exist, creating' + this.fileName);

await this.fs.mkdir(this.dbPath);
this.meta = {
...collectionConfig,
collectionName,
};
this.logger.log(LogLevel.Info, 'Initialising empty collection');
await this.fs.writeData(this.camaPath, this.collectionName, []);
await this.fs.commit(this.camaPath, this.collectionName);
this.logger.log(LogLevel.Info, 'Writing meta file');
return await this.fs.writeJSON<IMetaStructure>(this.dbPath, this.fileName, this.meta);
};
this.queue.add(initializeCollectionMetaTask);
}

/**
Expand All @@ -61,8 +63,7 @@ export class CollectionMeta implements ICollectionMeta {
* @param metaStructure - the value to be to be applied to the meta
*/
async update(collectionName: string, metaStructure: IMetaStructure): Promise<void> {

await this.queue.add(() => {
const updateCollectionMetaTask = () => {

this.logger.log(LogLevel.Info, 'Updating meta file');

Expand All @@ -71,17 +72,19 @@ export class CollectionMeta implements ICollectionMeta {
this.meta = Object.assign({}, this.meta, metaStructure);
this.logger.log(LogLevel.Info, 'Writing meta file');
return this.fs.writeJSON<IMetaStructure>(dbPath, 'meta.json', this.meta);
});
};
await this.queue.add(updateCollectionMetaTask);
}

/**
* Gets the in-memory meta value
*/
async get(): Promise<IMetaStructure|undefined> {
return await this.queue.add(() => {
const getCollectionMetaTask = () => {
this.logger.log(LogLevel.Info, 'Getting data from cache');
return this.meta;
});
};
return await this.queue.add(getCollectionMetaTask);

}
}
26 changes: 12 additions & 14 deletions src/modules/persistence/fs/fs-persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ export default class FSPersistence implements IPersistenceAdapter {
@inject(TYPES.System) private system: ISystem,
@inject(TYPES.QueueService) private queue: IQueueService
) {
this.outputPath = this.config.path || '.cama'
this.queue.add(() => this.getData());
this.outputPath = system.getOutputPath();
// const getDataTask = () => this.getData();
// this.queue.add(getDataTask);
}


Expand All @@ -37,13 +38,11 @@ export default class FSPersistence implements IPersistenceAdapter {
* @param rows - The rows to be inserted
*/
async insert<T>(rows: Array<any>): Promise<any> {
return await this.queue.add(()=> (async (rows) => {
const outputPath = path.join(process.cwd(), this.outputPath);
const data = [...(await this.getData()), ...rows];
await this.fs.writeData(outputPath, this.collectionName, data);
await this.fs.commit(outputPath, this.collectionName);
this.cache = data;
})(rows));
const data = [...(await this.getData()), ...rows];
await this.fs.writeData(this.outputPath, this.collectionName, data);
await this.fs.commit(this.outputPath, this.collectionName);
this.cache = data;

}

/**
Expand Down Expand Up @@ -84,11 +83,10 @@ export default class FSPersistence implements IPersistenceAdapter {
return this.cache;
}
async update(updated:any): Promise<void> {
await this.queue.add(() => (async (updated) => {
this.logger.log(LogLevel.Debug, `Writing file`);
await this.fs.writeData(this.outputPath, this.collectionName, updated);
this.cache = updated;
})(updated))
this.logger.log(LogLevel.Debug, `Writing file`);
await this.fs.writeData(this.outputPath, this.collectionName, updated);
this.cache = updated;

}

/**
Expand Down
9 changes: 4 additions & 5 deletions src/modules/persistence/fs/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import { TYPES } from '../../../types';
import { ISerializer } from '../../../interfaces/serializer.interface';
import { ILogger } from '../../../interfaces/logger.interface';
import { LogLevel } from '../../../interfaces/logger-level.enum';
import { IQueueService } from '../../../interfaces/queue-service.interface';

@injectable()
export class Fs implements IFS {



constructor(@inject(TYPES.Serializer) private serializer: ISerializer,
@inject(TYPES.Logger) private logger:ILogger,
@inject(TYPES.QueueService) private queue: IQueueService) {}
@inject(TYPES.Logger) private logger:ILogger) {}

/**
* Write a JSON object to a file
Expand Down Expand Up @@ -74,8 +72,9 @@ export class Fs implements IFS {
async commit(folderPath: string, collection:string): Promise<void> {
this.logger.log(LogLevel.Debug, `committing`);
this.logger.log(LogLevel.Debug, collection);
const outputPath = path.join(folderPath, `${collection}/data~`)
await this.queue.add(async () => nodeFs.rename(outputPath, outputPath.replace('~', '')))
const outputPath = path.join(folderPath, `${collection}/data~`);

await nodeFs.rename(outputPath, outputPath.replace('~', ''))
}

/**
Expand Down
24 changes: 13 additions & 11 deletions src/modules/queue/queue.service.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import { IQueueService } from '../../interfaces/queue-service.interface';
import { inject, injectable } from 'inversify';
import { TYPES } from '../../types';
import { IPersistenceAdapter } from '../../interfaces/persistence-adapter.interface';
import { ILogger } from '../../interfaces/logger.interface';
import { ICollectionMeta } from '../../interfaces/collection-meta.interface';
import PQueue from 'p-queue';

import { injectable } from 'inversify';

@injectable()
export class QueueService implements IQueueService {
private queue = new PQueue({ concurrency: 1 });

public promise = Promise.resolve();
public tasks:any = [];
constructor() {

}
add(task:any): Promise<any> {
return this.queue.add(task);
// return this.queue.add(task);
return new Promise((resolve, reject) => {
this.tasks.push(task);
this.promise = this.promise
.then(async () => task())
.then((result) => {
this.tasks.shift();
resolve(result);
}, reject);
});
}

}

0 comments on commit e39e43f

Please sign in to comment.