Skip to content

Commit

Permalink
Memleak (#169)
Browse files Browse the repository at this point in the history
* fixes #151

* clean up callback explicitly

* replace nanomsg_socket_t with NanoMsgPollCtx

* clean up ipc files in case tests don't

* properly free NanoMsgPollCtx

* clean up fcb

* remove duplicate code

by combining PollSendSocket and PollReceiveSocket into PollSocket

* move more logic into PollCtx

* everything is awe^H^H^Hconst

* prefer static_cast to reinterpret_cast

* avoid allocating a Nan::Callback, thanks @kkoopa

* more const

* clean up WrapPointer a little

* use sizeof instead literal value

* clean up handle->data

* remove a lot of unused WrapPointer code, the ultimate goal is to remove as much black magic as possible

* move PollCtx to its own compilation unit

* move UV_READABLE check back into NanomsgReadable

* consistent use of using keyword

* fix race condition in test/shutdown.js

It had assumed that a 5 publisher-1 subscriber topology would receive
messages in order (which on Travis CI seems to not occur) so had unsafe
shutdown code that would clean up prematurely in the case of late
deliverance.  This would mess up the count of subscribers since the
messages effectly race from publishers to subscriber.

As an example: before this patch, messages could be delivered in this
order:
```
p1 x10
p5
<shutdown occurs>
p2
<error, count is all messed up>
```
Now, each socket checks to see that the last one out hits the lights.

* Revert "clean up handle->data"

@m-ohuchi points out with valgrind that this is an invalid memory write.

This reverts commit 4a0d012.

* delete node_pointer.h, move pointer wrapping logic into PollCtx

* remove unused includes and then sort them

* change the line of contributors table to insert my info

* remove offset in UnwrapPointer; document wrap_pointer_callback

* don't deref an unwrapped PollCtx if it's NULL
  • Loading branch information
nickdesaulniers authored and reqshark committed Dec 20, 2016
1 parent a271345 commit 0461f5c
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 185 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use_system_libnanomsg:
npm i --use_system_libnanomsg=true

check:
rm -f some_address /tmp/*.ipc
find test/*.js test/standalone/*.js | xargs -n 1 node | node_modules/tap-difflet/bin/tap-difflet
rm -f some_address /tmp/*.ipc

test:
find test/*.js test/standalone/*.js | xargs -n 1 node | node_modules/tap-nyan/bin/cmd.js
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ Issues and pull requests welcome!
<tr><th align="left">Bent Cardan</th><td><a href="https://github.com/reqshark/">GitHub/reqshark</a></td><td><a href="http://twitter.com/rekshark">Twitter/@rekshark</a></td></tr>
<tr><th align="left">Deepak Prabhakara</th><td><a href="https://github.com/deepakprabhakara/">GitHub/deepakprabhakara</a></td><td><a href="http://twitter.com/deepakprab">Twitter/@deepakprab</a></td></tr>
<tr><th align="left">Flynn Joffray</th><td><a href="https://github.com/nucleardreamer/">GitHub/nucleardreamer</a></td><td><a href="http://twitter.com/nucleardreamer">Twitter/@nucleardreamer</a></td></tr>
<tr><th align="left">m-ohuchi</th><td><a href="https://github.com/m-ohuchi">GitHub/m-ohuchi</a></td><td>-</td></tr>
<tr><th align="left">Michele Comignano</th><td><a href="https://github.com/comick">GitHub/comick</a></td><td>-</td></tr>
<tr><th align="left">Nick Desaulniers</th><td><a href="https://github.com/nickdesaulniers">GitHub/nickdesaulniers</a></td><td><a href="http://twitter.com/LostOracle">Twitter/@LostOracle</a></td></tr>
<tr><th align="left">Tim Cameron Ryan</th><td><a href="https://github.com/tcr">GitHub/tcr</a></td><td><a href="http://twitter.com/timcameronryan">Twitter/@timcameronryan</a></td></tr>
Expand Down
5 changes: 4 additions & 1 deletion binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
'targets': [
{
'target_name': 'node_nanomsg',
'sources': [ 'src/node_nanomsg.cc' ],
'sources': [
'src/node_nanomsg.cc',
'src/poll_ctx.cc'
],
'include_dirs': [
"<!(node -e \"require('nan')\")",
],
Expand Down
4 changes: 2 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,15 @@ Socket.prototype._receive = function () {

Socket.prototype._startPollSend = function () {
if (!this._pollSend) {
this._pollSend = nn.PollSendSocket(this.binding, function (events) {
this._pollSend = nn.PollSocket(this.binding, true, function (events) {
if (events) this.flush();
}.bind(this));
}
}

Socket.prototype._startPollReceive = function () {
if (!this._pollReceive) {
this._pollReceive = nn.PollReceiveSocket(this.binding, function (events) {
this._pollReceive = nn.PollSocket(this.binding, false, function (events) {
if (events) this._receive();
}.bind(this));
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"Bent Cardan (https://github.com/reqshark)",
"Deepak Prabhakara (https://github.com/deepakprabhakara)",
"Flynn Joffray (https://github.com/nucleardreamer)",
"m-ohuchi (https://github.com/m-ohuchi)",
"Michele Comignano (https://github.com/comick)",
"Nick Desaulniers (https://github.com/nickdesaulniers)",
"Tim Cameron Ryan (https://github.com/tcr)",
Expand Down
120 changes: 35 additions & 85 deletions src/node_nanomsg.cc
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
#include <stdio.h>
#include <stdlib.h>
#include "node_pointer.h"
#include "bus.h"
#include "inproc.h"
#include "ipc.h"
#include "nn.h"
#include "pair.h"
#include "pipeline.h"
#include "poll_ctx.h"
#include "pubsub.h"
#include "reqrep.h"
#include "survey.h"
#include "tcp.h"
#include "ws.h"

#include <nn.h>

#include <inproc.h>
#include <ipc.h>
#include <tcp.h>
#include <ws.h>

#include <pubsub.h>
#include <pipeline.h>
#include <bus.h>
#include <pair.h>
#include <reqrep.h>
#include <survey.h>

using v8::Array;
using v8::Function;
using v8::FunctionTemplate;
using v8::Local;
Expand Down Expand Up @@ -70,7 +64,7 @@ NAN_METHOD(Chan) {
int s = Nan::To<int>(info[0]).FromJust();
int level = NN_SUB;
int option = Nan::To<int>(info[1]).FromJust();
v8::String::Utf8Value str(info[2]);
String::Utf8Value str(info[2]);

info.GetReturnValue().Set(
Nan::New<Number>(nn_setsockopt(s, level, option, *str, str.length())));
Expand Down Expand Up @@ -105,15 +99,14 @@ NAN_METHOD(Send) {
info.GetReturnValue().Set(Nan::New<Number>(nn_send(
s, node::Buffer::Data(info[1]), node::Buffer::Length(info[1]), flags)));
} else {
v8::String::Utf8Value str(info[1]);
String::Utf8Value str(info[1]);
info.GetReturnValue().Set(
Nan::New<Number>(nn_send(s, *str, str.length(), flags)));
}
}

void fcb(char *data, void *hint) {
static void fcb(char *data, void *) {
nn_freemsg(data);
(void) hint;
}

NAN_METHOD(Recv) {
Expand All @@ -125,7 +118,7 @@ NAN_METHOD(Recv) {
int len = nn_recv(s, &buf, NN_MSG, flags);

if (len > -1) {
v8::Local<v8::Object> h = Nan::NewBuffer(buf, len, fcb, 0).ToLocalChecked();
Local<Object> h = Nan::NewBuffer(buf, len, fcb, 0).ToLocalChecked();
info.GetReturnValue().Set(h);
} else {
info.GetReturnValue().Set(Nan::New<Number>(len));
Expand Down Expand Up @@ -191,68 +184,26 @@ NAN_METHOD(Err) {
info.GetReturnValue().Set(Nan::New(nn_strerror(nn_errno())).ToLocalChecked());
}

typedef struct nanomsg_socket_s {
uv_poll_t poll_handle;
uv_os_sock_t sockfd;
Nan::Callback *callback;
} nanomsg_socket_t;

void NanomsgReadable(uv_poll_t *req, int status, int events) {
Nan::HandleScope scope;

nanomsg_socket_t *context;
context = reinterpret_cast<nanomsg_socket_t *>(req);

if (events & UV_READABLE) {
Local<Value> argv[] = { Nan::New<Number>(events) };
context->callback->Call(1, argv);
}
NAN_METHOD(PollSocket) {
const int s = Nan::To<int>(info[0]).FromJust();
const bool is_sender = Nan::To<bool>(info[1]).FromJust();
const Local<Function> cb = info[2].As<Function>();
PollCtx *context = new PollCtx(s, is_sender, cb);
info.GetReturnValue().Set(PollCtx::WrapPointer(context, sizeof context));
}

NAN_METHOD(PollSendSocket) {
int s = Nan::To<int>(info[0]).FromJust();
Nan::Callback *callback = new Nan::Callback(info[1].As<Function>());

nanomsg_socket_t *context;
size_t siz = sizeof(uv_os_sock_t);

context = reinterpret_cast<nanomsg_socket_t *>(calloc(1, sizeof *context));
context->poll_handle.data = context;
context->callback = callback;
nn_getsockopt(s, NN_SOL_SOCKET, NN_SNDFD, &context->sockfd, &siz);

if (context->sockfd != 0) {
uv_poll_init_socket(uv_default_loop(), &context->poll_handle,
context->sockfd);
uv_poll_start(&context->poll_handle, UV_READABLE, NanomsgReadable);
info.GetReturnValue().Set(WrapPointer(context, 8));
}
}

NAN_METHOD(PollReceiveSocket) {
int s = Nan::To<int>(info[0]).FromJust();
Nan::Callback *callback = new Nan::Callback(info[1].As<Function>());

nanomsg_socket_t *context;
size_t siz = sizeof(uv_os_sock_t);

context = reinterpret_cast<nanomsg_socket_t *>(calloc(1, sizeof *context));
context->poll_handle.data = context;
context->callback = callback;
nn_getsockopt(s, NN_SOL_SOCKET, NN_RCVFD, &context->sockfd, &siz);

if (context->sockfd != 0) {
uv_poll_init_socket(uv_default_loop(), &context->poll_handle,
context->sockfd);
uv_poll_start(&context->poll_handle, UV_READABLE, NanomsgReadable);
info.GetReturnValue().Set(WrapPointer(context, 8));
}
static void close_cb(uv_handle_t *handle) {
const PollCtx* const context = static_cast<PollCtx*>(handle->data);
delete context;
}

NAN_METHOD(PollStop) {
nanomsg_socket_t *context = UnwrapPointer<nanomsg_socket_t *>(info[0]);
int r = uv_poll_stop(&context->poll_handle);
info.GetReturnValue().Set(Nan::New<Number>(r));
PollCtx* const context = PollCtx::UnwrapPointer(info[0]);
if (context != NULL) {
uv_close(reinterpret_cast<uv_handle_t*>(&context->poll_handle), close_cb);
}
// TODO: the else case should never happen. Maybe add an assert or
// something.
}

class NanomsgDeviceWorker : public Nan::AsyncWorker {
Expand Down Expand Up @@ -297,9 +248,9 @@ NAN_METHOD(DeviceWorker) {
Nan::AsyncQueueWorker(new NanomsgDeviceWorker(callback, s1, s2));
}

#define EXPORT_METHOD(C, S) \
Nan::Set(C, Nan::New(#S).ToLocalChecked(), \
Nan::GetFunction(Nan::New<FunctionTemplate>(S)).ToLocalChecked());
#define EXPORT_METHOD(C, S) \
Nan::Set(C, Nan::New(#S).ToLocalChecked(), \
Nan::GetFunction(Nan::New<FunctionTemplate>(S)).ToLocalChecked());

NAN_MODULE_INIT(InitAll) {
Nan::HandleScope scope;
Expand All @@ -314,8 +265,7 @@ NAN_MODULE_INIT(InitAll) {
EXPORT_METHOD(target, Send);
EXPORT_METHOD(target, Recv);
EXPORT_METHOD(target, Errno);
EXPORT_METHOD(target, PollSendSocket);
EXPORT_METHOD(target, PollReceiveSocket);
EXPORT_METHOD(target, PollSocket);
EXPORT_METHOD(target, PollStop);
EXPORT_METHOD(target, DeviceWorker);
EXPORT_METHOD(target, SymbolInfo);
Expand Down
66 changes: 0 additions & 66 deletions src/node_pointer.h

This file was deleted.

53 changes: 53 additions & 0 deletions src/poll_ctx.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "nn.h"
#include "poll_ctx.h"

using v8::Function;
using v8::Local;
using v8::Number;
using v8::Value;

static void NanomsgReadable(uv_poll_t* req, int /* status */, int events) {
const PollCtx* const context = static_cast<PollCtx*>(req->data);
if (events & UV_READABLE) {
context->invoke_callback(events);
}
}

void PollCtx::begin_poll (const int s, const bool is_sender) {
size_t siz = sizeof(uv_os_sock_t);
nn_getsockopt(s, NN_SOL_SOCKET, is_sender ? NN_SNDFD : NN_RCVFD, &sockfd,
&siz);
if (sockfd != 0) {
uv_poll_init_socket(uv_default_loop(), &poll_handle, sockfd);
uv_poll_start(&poll_handle, UV_READABLE, NanomsgReadable);
}
}

PollCtx::PollCtx (const int s, const bool is_sender,
const Local<Function> cb): callback(cb) {
// TODO: maybe container_of can be used instead?
// that would save us this assignment, and ugly static_cast hacks.
poll_handle.data = this;
begin_poll(s, is_sender);
}

void PollCtx::invoke_callback (const int events) const {
Nan::HandleScope scope;
Local<Value> argv[] = { Nan::New<Number>(events) };
callback.Call(1, argv);
}

// Nan will invoke this once it's done with the Buffer, in case we wanted to
// free ptr. In this case, ptr is a PollCtx that we're not done with and don't
// want to free yet (not until PollStop is invoked), so we do nothing.
static void wrap_pointer_cb(char * /* data */, void * /* hint */) {}

Local<Value> PollCtx::WrapPointer (void* ptr, size_t length) {
return Nan::NewBuffer(static_cast<char *>(ptr), length, wrap_pointer_cb, 0)

This comment has been minimized.

Copy link
@nickdesaulniers

nickdesaulniers Dec 29, 2016

Author Owner

we don't need to pass length, move sizeof here

.ToLocalChecked();
}

PollCtx* PollCtx::UnwrapPointer (v8::Local<v8::Value> buffer) {
return reinterpret_cast<PollCtx*>(node::Buffer::HasInstance(buffer) ?
node::Buffer::Data(buffer.As<v8::Object>()) : NULL);
}
17 changes: 17 additions & 0 deletions src/poll_ctx.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include <nan.h>

class PollCtx {
private:
const Nan::Callback callback;
uv_os_sock_t sockfd; // for libnanomsg
void begin_poll (const int s, const bool is_sender);
public:
uv_poll_t poll_handle; // for libuv
PollCtx (const int s, const bool is_sender,
const v8::Local<v8::Function> cb);
void invoke_callback (const int events) const;
static v8::Local<v8::Value> WrapPointer (void* ptr, size_t length);
static PollCtx* UnwrapPointer (v8::Local<v8::Value> buffer);
};
Loading

0 comments on commit 0461f5c

Please sign in to comment.