From f3482822b2abe819e5a25a218e08c9f9befd7c0f Mon Sep 17 00:00:00 2001 From: Mathew Heard Date: Wed, 30 Nov 2022 15:29:35 +1100 Subject: [PATCH] new Q.safeRace that does not use WeakMap and preserves the async stack --- index.js | 97 +++++++++++------------------- package.json | 2 +- test/q-tests.js | 156 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 64 deletions(-) diff --git a/index.js b/index.js index 2311138..839d3fe 100644 --- a/index.js +++ b/index.js @@ -148,71 +148,36 @@ function isPrimitive(value) { ); } -// Keys are the values passed to race, values are a record of data containing a -// set of deferreds and whether the value has settled. -const wm = new WeakMap(); - -// This NodeJS / v8 issue show the stupidity of Promise.race -// Issue: https://github.com/nodejs/node/issues/17469 -// Fortunately a nice guy (brainkim) wrote a safeRace function - -Q.safeRace = function (contenders) { - let deferred; - const result = new Promise((resolve, reject) => { - deferred = { resolve, reject }; - for (const contender of contenders) { - if (isPrimitive(contender)) { - // If the contender is a primitive, attempting to use it as a key in the - // weakmap would throw an error. Luckily, it is safe to call - // `Promise.resolve(contender).then` on a primitive value multiple times - // because the promise fulfills immediately. - Promise.resolve(contender).then(resolve, reject); - continue; - } - - let record = wm.get(contender); - if (record === undefined) { - record = { deferreds: new Set([deferred]), settled: false }; - wm.set(contender, record); - // This call to `then` happens once for the lifetime of the value. - Promise.resolve(contender).then( - (value) => { - record.settled = true; - for (const { resolve } of record.deferreds) { - resolve(value); - } - - record.deferreds.clear(); - }, - (err) => { - record.settled = true; - for (const { reject } of record.deferreds) { - reject(err); - } - - record.deferreds.clear(); - }, - ); - } else if (record.settled) { - // If the value has settled, it is safe to call - // `Promise.resolve(contender).then` on it. - Promise.resolve(contender).then(resolve, reject); - } else { - record.deferreds.add(deferred); - } +Q.safeRace = async function(contenders) { + let deferreds = [], promises = [] + //const wm = new WeakMap() + for (let contender of contenders) { + const deferred = Q.defer() + deferreds.push(deferred) + promises.push(deferred.promise) + + if(!contender.then){ + deferred.resolve(contender) + } else { + /*wm.set(contender, deferred) + contender.then(function(a){ + const d = wm.get(contender) + if(d) d.resolve(a) + }, function(a){ + const d = wm.get(contender) + if(d) d.reject(a) + })*/ + contender.then(deferred.resolve, deferred.reject) } - }); - - // The finally callback executes when any value settles, preventing any of - // the unresolved values from retaining a reference to the resolved value. - return result.finally(() => { - for (const contender of contenders) { - if (!isPrimitive(contender)) { - const record = wm.get(contender); - record.deferreds.delete(deferred); - } + } + + try { + await Promise.race(promises) + } finally { + for (const deferred of deferreds) { + deferred.resolve() } - }); + } } Q.cancelledRace = async function (promises, safeRace = true) { @@ -307,6 +272,12 @@ Q.finvoke = async function (object, method, ...args) { return Q.fcall(await object[method].bind(object), ...args) } +Q.nextTick = function(){ + return new Promise(resolve=>{ + process.nextTick(resolve) + }) +} + Q.resetUnhandledRejections = function () { } async function safeAll(values, cancelFn) { diff --git a/package.json b/package.json index f965de9..ae3df5f 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "A light implementaion of Q", "keywords": [], "scripts": { - "test": "mocha" + "test": "mocha --expose-gc" }, "main": "index.js", "author": "Mathew Heard", diff --git a/test/q-tests.js b/test/q-tests.js index f9e18a0..8a98a88 100644 --- a/test/q-tests.js +++ b/test/q-tests.js @@ -3,6 +3,73 @@ const Q = require('../index'), describe('Q tests', function(){ describe('safeRace', function(){ + it('should not leak on unresolved', async function(){ + async function randomString(length) { + let result = ""; + const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + for (let i = 0; i < length; i++) { + result += characters.charAt(Math.floor(Math.random() * characters.length)); + } + await Q.nextTick() + return result; + } + + function rs(length = 10000){ + let result = ""; + const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + for (let i = 0; i < length; i++) { + result += characters.charAt(Math.floor(Math.random() * characters.length)); + } + return result + } + + function getHeap(){ + global.gc() + const usage = process.memoryUsage(); + return usage.heapUsed + } + const ds = [] + + const beforeHeap = getHeap() + + for(let i = 0; i < 100; i++){ + const deferred1 = Q.defer() + const promise = randomString(10000) + + + const c = rs() + ds.push(deferred1) + await Promise.race([deferred1.promise, promise, c]) + } + + + await Q.delay(10) + const afterLeakHeap = getHeap() + + expect(afterLeakHeap - beforeHeap > 1000000).to.be.true + + + + for(let i = 0; i < 100; i++){ + const deferred1 = Q.defer() + const promise = randomString() + + const c = rs() + ds.push(deferred1) + + await Q.safeRace([deferred1.promise, promise, c]) + } + + await Q.delay(10) + const afterSafeHeap = getHeap() + + console.log({afterLeakHeap, afterSafeHeap, diff: afterSafeHeap - afterLeakHeap}) + + expect(afterSafeHeap - afterLeakHeap < 1000000).to.be.true + + // this is required to prevent GC of ds + expect(ds.length).to.be.eql(200) + }) it('should preserve stack', async function(){ async function testFn(){ const deferred = Q.defer() @@ -19,6 +86,73 @@ describe('Q tests', function(){ expect(ex.stack.toString()).to.contain('testFn') } }) + it('should preserve stack (2)', async function(){ + async function testFn(){ + const deferred = Q.defer() + async function a(){ + await Q.delay(1) + throw new Error('test') + } + const promise = a() + + // will fail if this becomes awaited + // this is because the stack is not preserved + + //await Q.delay(10) + const p = Q.safeRace([deferred.promise,promise]) + deferred.resolve() + return await p + } + try { + await testFn() + } catch(ex){ + expect(ex.stack.toString()).to.contain('testFn') + } + }) + it('should preserve stack (3)', async function(){ + async function testFn(){ + const deferred = Q.defer() + async function a(){ + await Q.delay(1) + throw new Error('test') + } + const promise = a() + + // unlike test 2 this will work, because the stack is preserved, however if promise is never resolved then promise will leak + await Promise.race([Q.delay(10), promise]) + + const p = Q.safeRace([deferred.promise,promise]) + deferred.resolve() + return await p + } + try { + await testFn() + } catch(ex){ + expect(ex.stack.toString()).to.contain('testFn') + } + }) + + it('should preserve stack (5)', async function(){ + async function testFn(){ + const deferred = Q.defer() + async function a(){ + await Q.delay(1) + throw new Error('test') + } + const promise = a() + + await Q.safeRace([Q.delay(10), promise]) + + const p = Q.cancelledRace([deferred.promise, promise]) + deferred.resolve() + return await p + } + try { + await testFn() + } catch(ex){ + expect(ex.stack.toString()).to.contain('testFn') + } + }) }) describe('cancelledRace', function(){ it('should preserve stack', async function(){ @@ -105,5 +239,27 @@ describe('Q tests', function(){ expect(ex.stack.toString()).to.contain('testFn') } }) + + it('should preserve stack (5)', async function(){ + async function testFn(){ + const deferred = Q.defer() + async function a(){ + await Q.delay(1) + throw new Error('test') + } + const promise = a() + + await Q.cancelledRace([Q.delay(10), promise]) + + const p = Q.cancelledRace([deferred.promise,promise]) + deferred.resolve() + return await p + } + try { + await testFn() + } catch(ex){ + expect(ex.stack.toString()).to.contain('testFn') + } + }) }) }) \ No newline at end of file