From cfbf1a658431876e757a2906f9b32526d062f375 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 30 Oct 2023 13:15:55 +0100 Subject: [PATCH 1/3] feat!: drop streams support COMPASS-7124 We don't really need streams usage anymore and can just drop it. That's an easy way to ensure that we don't have any dependencies that require Node.js-specific builtins or polyfills of those. --- .github/workflows/unit-tests.yml | 2 + examples/parse-from-file.ts | 11 ++--- src/index.ts | 65 +++++++++------------------- src/stream.ts | 57 ------------------------- test/no-node.test.ts | 72 ++++++++++++++++++++++++++++++++ test/stream.test.ts | 41 ------------------ 6 files changed, 98 insertions(+), 150 deletions(-) delete mode 100644 src/stream.ts create mode 100644 test/no-node.test.ts delete mode 100644 test/stream.test.ts diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 39f647a..ff682a2 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -21,6 +21,8 @@ jobs: run: npm ci - name: Check run: npm run check + - name: Build + run: npm run build - name: Test run: npm test - name: Coverage diff --git a/examples/parse-from-file.ts b/examples/parse-from-file.ts index 3d1887c..80d63ab 100644 --- a/examples/parse-from-file.ts +++ b/examples/parse-from-file.ts @@ -4,8 +4,7 @@ import { pipeline as callbackPipeline, PassThrough, Transform } from 'stream'; import path from 'path'; import fs from 'fs'; import { promisify } from 'util'; - -import stream from '../src/stream'; +import { parseSchema } from '../src'; const schemaFileName = path.join(__dirname, './fanclub.json'); @@ -44,12 +43,10 @@ async function parseFromFile(fileName: string) { }); const dest = new PassThrough({ objectMode: true }); + const resultPromise = parseSchema(dest); const pipeline = promisify(callbackPipeline); - await pipeline(fileReadStream, createFileStreamLineParser(), stream(), dest); - let res; - for await (const result of dest) { - res = result; - } + await pipeline(fileReadStream, createFileStreamLineParser(), dest); + const res = await resultPromise; const dur = Date.now() - startTime; console.log(res); diff --git a/src/index.ts b/src/index.ts index dd2b196..b2f4a5f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,9 +1,3 @@ -import type { AggregationCursor, Document, FindCursor } from 'mongodb'; -import { Readable, PassThrough } from 'stream'; -import { pipeline } from 'stream/promises'; - -import stream from './stream'; -import type { ParseStreamOptions } from './stream'; import { SchemaAnalyzer } from './schema-analyzer'; import type { ArraySchemaType, @@ -24,43 +18,30 @@ import type { } from './schema-analyzer'; import * as schemaStats from './stats'; -type MongoDBCursor = AggregationCursor | FindCursor; +type AnyIterable = Iterable | AsyncIterable; -function getStreamSource( - source: Document[] | MongoDBCursor | Readable -): Readable { - let streamSource: Readable; - if ('stream' in source) { - // MongoDB Cursor. - streamSource = source.stream(); - } else if ('pipe' in source) { - // Document stream. - streamSource = source; - } else if (Array.isArray(source)) { - // Array of documents. - streamSource = Readable.from(source); - } else { +function verifyStreamSource( + source: AnyIterable +): AnyIterable { + if (!(Symbol.iterator in source) && !(Symbol.asyncIterator in source)) { throw new Error( 'Unknown input type for `docs`. Must be an array, ' + 'stream or MongoDB Cursor.' ); } - return streamSource; + return source; } -async function schemaStream( - source: Document[] | MongoDBCursor | Readable, - options?: ParseStreamOptions -) { - const streamSource = getStreamSource(source); - - const dest = new PassThrough({ objectMode: true }); - await pipeline(streamSource, stream(options), dest); - for await (const result of dest) { - return result; +async function getCompletedSchemaAnalyzer( + source: AnyIterable, + options?: SchemaParseOptions +): Promise { + const analyzer = new SchemaAnalyzer(options); + for await (const doc of verifyStreamSource(source)) { + analyzer.analyzeDoc(doc); } - throw new Error('unreachable'); // `dest` always emits exactly one doc. + return analyzer; } /** @@ -68,28 +49,24 @@ async function schemaStream( * MongoDB cursor object to parse documents` from. */ async function parseSchema( - source: Document[] | MongoDBCursor | Readable, + source: AnyIterable, options?: SchemaParseOptions ): Promise { - return await schemaStream(source, options); + return (await getCompletedSchemaAnalyzer(source, options)).getResult(); } // Convenience shortcut for getting schema paths. async function getSchemaPaths( - source: Document[] | MongoDBCursor | Readable + source: AnyIterable ): Promise { - return await schemaStream(source, { - schemaPaths: true - }); + return (await getCompletedSchemaAnalyzer(source)).getSchemaPaths(); } // Convenience shortcut for getting the simplified schema. async function getSimplifiedSchema( - source: Document[] | MongoDBCursor | Readable + source: AnyIterable ): Promise { - return await schemaStream(source, { - simplifiedSchema: true - }); + return (await getCompletedSchemaAnalyzer(source)).getSimplifiedSchema(); } export default parseSchema; @@ -113,8 +90,6 @@ export type { }; export { - stream, - getStreamSource, parseSchema, getSchemaPaths, getSimplifiedSchema, diff --git a/src/stream.ts b/src/stream.ts deleted file mode 100644 index 5b47741..0000000 --- a/src/stream.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { Duplex } from 'stream'; - -import type { - Document -} from 'bson'; - -import { SchemaAnalyzer } from './schema-analyzer'; -import type { SchemaParseOptions } from './schema-analyzer'; - -export type ParseStreamOptions = SchemaParseOptions & { - simplifiedSchema?: boolean, - schemaPaths?: boolean; -}; - -export class ParseStream extends Duplex { - analyzer: SchemaAnalyzer; - options: ParseStreamOptions; - schemaPaths = false; - - constructor(options?: ParseStreamOptions) { - super({ objectMode: true }); - this.options = options || {}; - this.analyzer = new SchemaAnalyzer(options); - } - - _write(obj: Document, enc: unknown, cb: (err?: any) => void) { - try { - this.analyzer.analyzeDoc(obj); - this.emit('progress', obj); - cb(); - } catch (err: any) { - cb(err); - } - } - - _read() {} - - _final(cb: (err?: any) => void) { - try { - if (this.options.schemaPaths) { - this.push(this.analyzer.getSchemaPaths()); - } else if (this.options.simplifiedSchema) { - this.push(this.analyzer.getSimplifiedSchema()); - } else { - this.push(this.analyzer.getResult()); - } - this.push(null); - cb(); - } catch (err: any) { - cb(err); - } - } -} - -export default function makeParseStream(options?: ParseStreamOptions) { - return new ParseStream(options); -} diff --git a/test/no-node.test.ts b/test/no-node.test.ts new file mode 100644 index 0000000..356bc26 --- /dev/null +++ b/test/no-node.test.ts @@ -0,0 +1,72 @@ +import assert from 'assert'; +import vm from 'vm'; +import fs from 'fs'; +import path from 'path'; + +function createMockModuleSystem() { + const context = vm.createContext(Object.create(null)); + class Module { + exports = {}; + } + const modules = new Map(); + // Tiny (incomplete) CommonJS module system mock + function makeRequire(basename: string) { + return function require(identifier: string): any { + if (!identifier.startsWith('./') && !identifier.startsWith('../') && !identifier.startsWith('/')) { + let current = path.dirname(basename); + let previous: string; + do { + const nodeModulesEntry = path.resolve(current, 'node_modules', identifier); + previous = current; + current = path.dirname(current); + if (fs.existsSync(nodeModulesEntry)) { + return require(nodeModulesEntry); + } + } while (previous !== current); + throw new Error(`mock require() does not support Node.js built-ins (${identifier})`); + } + let file = path.resolve(path.dirname(basename), identifier); + if (!fs.existsSync(file) && fs.existsSync(`${file}.js`)) { + file = `${file}.js`; + } else if (fs.statSync(file).isDirectory()) { + if (fs.existsSync(`${file}/package.json`)) { + const pjson = JSON.parse(fs.readFileSync(`${file}/package.json`, 'utf8')); + file = path.resolve(file, pjson.main || 'index.js'); + } else if (fs.existsSync(`${file}/index.js`)) { + file = path.resolve(file, 'index.js'); + } + } + const existing = modules.get(file); + if (existing) { + return existing.exports; + } + const module = new Module(); + const source = fs.readFileSync(file); + vm.runInContext(`(function(require, module, exports, __filename, __dirname) {\n${source}\n})`, context)( + makeRequire(file), module, module.exports, file, path.dirname(file) + ); + modules.set(file, module); + return module.exports; + }; + } + return makeRequire; +} + +describe('getSchema should work in plain JS environment without Node.js or browser dependencies', function() { + const docs = [ + { foo: 'bar' }, + { country: 'Croatia' }, + { country: 'Croatia' }, + { country: 'England' } + ]; + + it('Check if return value is a promise', async function() { + const makeRequire = createMockModuleSystem(); + + const { parseSchema } = makeRequire(__filename)('../lib/index.js') as typeof import('..'); + + const result = await parseSchema(docs); + assert.strictEqual(result.count, 4); + assert.strictEqual(result.fields.map(f => f.name).join(','), 'country,foo'); + }); +}); diff --git a/test/stream.test.ts b/test/stream.test.ts deleted file mode 100644 index 12e1082..0000000 --- a/test/stream.test.ts +++ /dev/null @@ -1,41 +0,0 @@ -import assert from 'assert'; -import { Readable } from 'stream'; - -import nativeParser from '../src/stream'; -import type { Schema } from '../src/schema-analyzer'; - -const fixture = [ - { - foo: 1, - bar: 'test', - arr: [1, 2, 3] - }, - { - foo: 2, - baz: true, - arr: ['foo'] - }, - { - foo: 3, - bar: 'another test' - } -]; - -describe('native schema stream', function() { - let progress = 0; - it('should trigger progress event for each document', function(done) { - const native = nativeParser(); - Readable.from(fixture).pipe(native) - .on('progress', function() { - progress += 1; - }) - .on('data', function(schema: Schema) { - assert.ok(schema); - assert.equal(progress, 3); - }) - .on('end', function() { - assert.equal(progress, 3); - done(); - }); - }); -}); From d049e751a9a36cf9d57327b3d3d27c44113f7678 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 30 Oct 2023 13:30:37 +0100 Subject: [PATCH 2/3] revertme: debug --- test/no-node.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/test/no-node.test.ts b/test/no-node.test.ts index 356bc26..4356a27 100644 --- a/test/no-node.test.ts +++ b/test/no-node.test.ts @@ -20,6 +20,7 @@ function createMockModuleSystem() { previous = current; current = path.dirname(current); if (fs.existsSync(nodeModulesEntry)) { + console.log({ previous, current, nodeModulesEntry, identifier }); return require(nodeModulesEntry); } } while (previous !== current); From 19dada619ae22b30bbb4bf559374d9c8e0f03014 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 30 Oct 2023 13:37:42 +0100 Subject: [PATCH 3/3] fixup --- test/no-node.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/no-node.test.ts b/test/no-node.test.ts index 4356a27..60c3e3f 100644 --- a/test/no-node.test.ts +++ b/test/no-node.test.ts @@ -12,7 +12,7 @@ function createMockModuleSystem() { // Tiny (incomplete) CommonJS module system mock function makeRequire(basename: string) { return function require(identifier: string): any { - if (!identifier.startsWith('./') && !identifier.startsWith('../') && !identifier.startsWith('/')) { + if (!identifier.startsWith('./') && !identifier.startsWith('../') && !path.isAbsolute(identifier)) { let current = path.dirname(basename); let previous: string; do { @@ -20,7 +20,6 @@ function createMockModuleSystem() { previous = current; current = path.dirname(current); if (fs.existsSync(nodeModulesEntry)) { - console.log({ previous, current, nodeModulesEntry, identifier }); return require(nodeModulesEntry); } } while (previous !== current);