Skip to content

Commit

Permalink
fix a memory leak with items captured in closure and the callback its…
Browse files Browse the repository at this point in the history
…elf (#13)

* fix a memory leak with items captured in closure and the callback itself
* version guards for async_resource, and use async_resource in Call to avoid deprecated call
  • Loading branch information
jbeverly authored Aug 20, 2018
1 parent 952b7be commit 68ef2cb
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 82 deletions.
12 changes: 8 additions & 4 deletions async_queued_progress_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ class AsyncQueuedProgressWorker : public Nan::AsyncWorker {
virtual void HandleOKCallback() override {
Nan::HandleScope scope;
if (callback) {
#if defined(NODE_8_0_MODULE_VERSION) && (NODE_8_0_MODULE_VERSION > 51)
callback->Call(0, NULL, async_resource);
#else
callback->Call(0, NULL);
#endif
}
}

Expand All @@ -82,10 +86,6 @@ class AsyncQueuedProgressWorker : public Nan::AsyncWorker {

/// close our async_t handle and free resources (via AsyncClose method)
virtual void Destroy() override {
// Destroy happens in the v8 main loop; so we can flush out the Progress queue here before destroying
if (this->buffer_.read_available()) {
HandleProgressQueue();
}
// NOTABUG: Nan uses reinterpret_cast to pass uv_async_t around
uv_close(reinterpret_cast<uv_handle_t*>(async_.get()), AsyncClose);
}
Expand Down Expand Up @@ -137,6 +137,10 @@ class AsyncQueuedProgressWorker : public Nan::AsyncWorker {
// touch v8 data structures
static void AsyncClose(uv_handle_t* handle) {
auto worker = static_cast<AsyncQueuedProgressWorker*>(handle->data);
// Destroy happens in the v8 main loop; so we can flush out the Progress queue here before destroying
if (worker->buffer_.read_available()) {
worker->HandleProgressQueue();
}
delete worker;
}

Expand Down
5 changes: 5 additions & 0 deletions eventemitter_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class EventEmitter {
callback_->Call(1, info);
}

~Receiver() {
callback_->Reset();
delete callback_;
}

private:
Nan::Callback* callback_;
};
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"code": "^4.1.0",
"eslint": "^3.19.0",
"eslint-plugin-dependencies": "^2.4.0",
"leakage": "^0.4.0",
"mocha": "^3.4.2",
"node-gyp": "^3.6.2",
"path": "^0.12.7"
Expand Down
26 changes: 22 additions & 4 deletions test/cpp/eventemitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,36 +100,54 @@ class EmittingThing : public Nan::ObjectWrap {
}

static NAN_METHOD(Run) {
if (info.Length() != 1) {
Nan::Callback* fn(nullptr);
if (info.Length() < 1 || info.Length() > 2) {
info.GetIsolate()->ThrowException(Nan::TypeError("Wrong number of arguments"));
return;
}
if (!info[0]->IsNumber()) {
info.GetIsolate()->ThrowException(Nan::TypeError("First argument must be number"));
return;
}
if (info.Length() == 2) {
if(info[1]->IsFunction()) {
fn = new Nan::Callback(info[1].As<Function>());
} else {
info.GetIsolate()->ThrowException(Nan::TypeError("Second argument must be function"));
return;
}
}

int32_t n = info[0]->Int32Value();
auto thing = Nan::ObjectWrap::Unwrap<EmittingThing>(info.Holder());

TestWorker* worker = new TestWorker(nullptr, thing->emitter_, n);
TestWorker* worker = new TestWorker(fn, thing->emitter_, n);
Nan::AsyncQueueWorker(worker);
}

static NAN_METHOD(RunReentrant) {
if (info.Length() != 1) {
Nan::Callback* fn(nullptr);
if (info.Length() < 1 || info.Length() > 2) {
info.GetIsolate()->ThrowException(Nan::TypeError("Wrong number of arguments"));
return;
}
if (!info[0]->IsNumber()) {
info.GetIsolate()->ThrowException(Nan::TypeError("First argument must be number"));
return;
}
if (info.Length() == 2) {
if(info[1]->IsFunction()) {
fn = new Nan::Callback(info[1].As<Function>());
} else {
info.GetIsolate()->ThrowException(Nan::TypeError("Second argument must be function"));
return;
}
}

int32_t n = info[0]->Int32Value();
auto thing = Nan::ObjectWrap::Unwrap<EmittingThing>(info.Holder());

TestReentrantWorker* worker = new TestReentrantWorker(nullptr, thing->emitter_, n);
TestReentrantWorker* worker = new TestReentrantWorker(fn, thing->emitter_, n);
Nan::AsyncQueueWorker(worker);
}

Expand Down
187 changes: 113 additions & 74 deletions test/js/eventemitter.js
Original file line number Diff line number Diff line change
@@ -1,95 +1,134 @@
'use strict'

const expect = require('code').expect
const testRoot = require('path').resolve(__dirname, '..')
const bindings = require('bindings')({ 'module_root': testRoot, bindings: 'eventemitter' })

describe('Verify EventEmitter Single', function() {
it('should invoke the callback for test', function(done) {
let thing = new bindings.EmitterThing()
let n = 100
let k = 0
thing.on('test', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k++)
if (k === n) {
done()
}
const iterate = require('leakage').iterate
const Promise = global.Promise

describe('Verify EventEmitter', function() {
describe('Verify EventEmitter Single', function() {
it('should invoke the callback for test', function(done) {
let thing = new bindings.EmitterThing()
let n = 100
let k = 0
thing.on('test', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k++)
if (k === n) {
done()
}
})

thing.run(n)
})

thing.run(n)
})
})

describe('Verify EventEmitter Multi', function() {
it('should invoke the callbacks for test, test2, and test3', function(done) {
let thing = new bindings.EmitterThing()
let n = 300
let k = [0, 0, 0]
thing.on('test', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[0]++)
describe('Verify EventEmitter Multi', function() {
it('should invoke the callbacks for test, test2, and test3', function(done) {
let thing = new bindings.EmitterThing()
let n = 300
let k = [0, 0, 0]
thing.on('test', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[0]++)
})

thing.on('test2', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[1]++)
})
thing.on('test3', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[2]++)

if (k[2] === n) {
while (k[0] !== n || k[1] !== n) { /* do nothing */ }
done()
}
})
thing.run(n)
})

thing.on('test2', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[1]++)
})
thing.on('test3', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[2]++)

if (k[2] === n) {
while (k[0] !== n || k[1] !== n) { /* do nothing */ }
done()
}
})
thing.run(n)
})
})


describe('Verify EventEmitter Reentrant Single', function() {
it('should invoke the callback for test', function(done) {
let thing = new bindings.EmitterThing()
let n = 100
let k = 0
thing.on('test', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k++)
if (k === n) {
done()
}
describe('Verify EventEmitter Reentrant Single', function() {
it('should invoke the callback for test', function(done) {
let thing = new bindings.EmitterThing()
let n = 100
let k = 0
thing.on('test', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k++)
if (k === n) {
done()
}
})

thing.runReentrant(n)
})

thing.runReentrant(n)
})
})

describe('Verify EventEmitter Reentrant Multi', function() {
it('should invoke the callbacks for test, test2, and test3', function(done) {
let thing = new bindings.EmitterThing()
let n = 300
let k = [0, 0, 0]
thing.on('test', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[0]++)
describe('Verify EventEmitter Reentrant Multi', function() {
it('should invoke the callbacks for test, test2, and test3', function(done) {
let thing = new bindings.EmitterThing()
let n = 300
let k = [0, 0, 0]
thing.on('test', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[0]++)
})

thing.on('test2', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[1]++)
})
thing.on('test3', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[2]++)

if (k[2] === n) {
while (k[0] !== n || k[1] !== n) { /* do nothing */ }
done()
}
})
thing.runReentrant(n)
})
})


thing.on('test2', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[1]++)
describe('Verify callback memory is reclaimed, even if callback is not waited on', function() {
it('Should not increase memory usage over time', function(done) {
this.timeout(15000)
iterate(() => {
let thing = new bindings.EmitterThing()
let v = new Buffer(1000)

thing.on('test', function(ev) {
v.compare(new Buffer(1000))
})
})
done()
})
thing.on('test3', function(ev) {
expect(ev).to.be.a.string()
expect(ev).to.equal('Test' + k[2]++)

if (k[2] === n) {
while (k[0] !== n || k[1] !== n) { /* do nothing */ }
done()
}
})

describe('Verify memory allocated in the test-class EmitterThing is reclaimed', function() {
it('Should not increase memory usage over time', function() {
this.timeout(15000)
return iterate.async(() => {
return new Promise((resolve, reject) => {
let thing = new bindings.EmitterThing()
let v = new Buffer(1000)
let n = 1

thing.on('test', function(ev) {
v.compare(new Buffer(1000))
})
thing.run(n, function() {
resolve()
})
})
})
})
thing.runReentrant(n)
})
})

Expand Down

0 comments on commit 68ef2cb

Please sign in to comment.