Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various memory corruption and unwanted behavior scenarios #591

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/subscribers/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void nchan_subscriber_timeout_ev_handler(ngx_event_t *ev) {
#if FAKESHARD
memstore_fakeprocess_push(sub->owner);
#endif
sub->dequeue_after_response = 1;
//sub->dequeue_after_response = 1; //see https://github.com/slact/nchan/pull/591
sub->fn->respond_status(sub, NGX_HTTP_REQUEST_TIMEOUT, &NCHAN_HTTP_STATUS_408, NULL);
#if FAKESHARD
memstore_fakeprocess_pop();
Expand Down
2 changes: 1 addition & 1 deletion src/subscribers/internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ static ngx_int_t internal_respond_status(subscriber_t *self, ngx_int_t status_co
internal_subscriber_t *f = (internal_subscriber_t *)self;
DBG("%p status %i", self, status_code);
if(status_code == NGX_HTTP_GONE) {
self->dequeue_after_response = 1;
//self->dequeue_after_response = 1; //see https://github.com/slact/nchan/pull/591
}
f->respond_status(status_code, (void *)status_line, f->privdata);
reset_timer(f);
Expand Down
38 changes: 30 additions & 8 deletions src/subscribers/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,10 @@ static ngx_int_t websocket_publish(full_subscriber_t *fsub, ngx_buf_t *buf, int
//move the msg pool
d->pool = fsub->publisher.msg_pool;
d->msgbuf = buf;
d->subrequest = NULL;
fsub->publisher.msg_pool = NULL;

if(fsub->publisher.intercept || fsub->publisher.upstream_request_url == NULL) { // don't need to send request upstream
d->subrequest = NULL;
websocket_publish_continue(d);
}
else {
Expand Down Expand Up @@ -1086,7 +1086,7 @@ static ngx_int_t websocket_perform_handshake(full_subscriber_t *fsub) {

static void websocket_reading(ngx_http_request_t *r);

static ngx_buf_t *websocket_inflate_message(full_subscriber_t *fsub, ngx_buf_t *msgbuf, ngx_pool_t *pool) {
static ngx_buf_t *websocket_inflate_message(full_subscriber_t *fsub, ngx_buf_t *msgbuf, ngx_pool_t *pool, uint64_t max, int *result) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller below expects *result to be initialized if the return value is NULL. However, there appear to be three (rare?) cases below where this does not happen.

Also a -1 magic number for "message too large" is a bit ugly, and it may be better to follow a pattern where the primary return value indicates whether/how the call is successful and the pointer is returned via indirection (ngx_buf_t **result).

#if (NGX_ZLIB)
z_stream *strm;
int rc;
Expand Down Expand Up @@ -1115,7 +1115,7 @@ static ngx_buf_t *websocket_inflate_message(full_subscriber_t *fsub, ngx_buf_t *

strm = fsub->deflate.zstream_in;

outbuf = nchan_inflate(strm, msgbuf, fsub->sub.request, pool);
outbuf = nchan_inflate(strm, msgbuf, fsub->sub.request, pool, max, result);
return outbuf;
#else
return NULL;
Expand Down Expand Up @@ -1304,6 +1304,8 @@ static void websocket_reading(ngx_http_request_t *r) {
ngx_connection_t *c;
ngx_buf_t *msgbuf, buf;
//ngx_str_t msg_in_str;
ngx_http_core_loc_conf_t *clcf;
int result;
retry:
ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
fsub = (full_subscriber_t *)ctx->sub;
Expand Down Expand Up @@ -1468,8 +1470,15 @@ static void websocket_reading(ngx_http_request_t *r) {
return websocket_reading_finalize(r);
}

//TODO: check max websocket message length
clcf = ngx_http_get_module_loc_conf(ctx->sub->request, ngx_http_core_module);

if(frame->payload == NULL) {
if(clcf->client_max_body_size && (uint64_t)clcf->client_max_body_size < frame->payload_len) {
websocket_send_close_frame_cstr(fsub, CLOSE_POLICY_VIOLATION, "Message too large.");
ws_destroy_msgpool(fsub);
fsub->publisher.msg_pool = NULL;
return websocket_reading_finalize(r);
}
if(ws_get_msgpool(fsub) == NULL) {
ERR("failed to get msgpool");
websocket_send_close_frame(fsub, CLOSE_INTERNAL_SERVER_ERROR, NULL);
Expand All @@ -1486,6 +1495,13 @@ static void websocket_reading(ngx_http_request_t *r) {
}

set_buffer(&buf, frame->payload, frame->last, frame->payload_len);
if(clcf->client_max_body_size && clcf->client_max_body_size < ngx_buf_size(&buf)) {
websocket_send_close_frame_cstr(fsub, CLOSE_POLICY_VIOLATION, "Message too large.");
ws_destroy_msgpool(fsub);
fsub->publisher.msg_pool = NULL;
return websocket_reading_finalize(r);
}


if (frame->payload_len > 0 && (rc = ws_recv(c, rev, &buf, frame->payload_len)) != NGX_OK) {
DBG("ws_recv NOT OK when receiving payload, but that's ok");
Expand Down Expand Up @@ -1513,7 +1529,12 @@ static void websocket_reading(ngx_http_request_t *r) {

//inflate message if needed
if(fsub->deflate.enabled && frame->rsv1) {
if((msgbuf = websocket_inflate_message(fsub, msgbuf, ws_get_msgpool(fsub))) == NULL) {
if((msgbuf = websocket_inflate_message(fsub, msgbuf, ws_get_msgpool(fsub), (uint64_t)clcf->client_max_body_size, &result)) == NULL) {
if(result == -1) {
websocket_send_close_frame_cstr(fsub, CLOSE_POLICY_VIOLATION, "Message too large.");
ws_destroy_msgpool(fsub);
return websocket_reading_finalize(r);
}
websocket_send_close_frame_cstr(fsub, CLOSE_INVALID_PAYLOAD, "Invalid permessage-deflate data");
ws_destroy_msgpool(fsub);
return websocket_reading_finalize(r);
Expand Down Expand Up @@ -1577,7 +1598,7 @@ static void websocket_reading(ngx_http_request_t *r) {

static ngx_flag_t is_utf8(ngx_buf_t *buf) {

u_char *p;
u_char *p, *op;
size_t n;

u_char c, *last;
Expand All @@ -1599,6 +1620,7 @@ static ngx_flag_t is_utf8(ngx_buf_t *buf) {
}

last = p + n;
op = p;

for (len = 0; p < last; len++) {
c = *p;
Expand All @@ -1611,13 +1633,13 @@ static ngx_flag_t is_utf8(ngx_buf_t *buf) {
if (ngx_utf8_decode(&p, last - p) > 0x10ffff) {
/* invalid UTF-8 */
if(mmapped) {
munmap(p, n);
munmap(op, n);
}
return 0;
}
}
if(mmapped) {
munmap(p, n);
munmap(op, n);
}
return 1;
}
Expand Down
15 changes: 14 additions & 1 deletion src/util/nchan_fake_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,14 @@ static void fakerequest_cleanup_timer_handler(ngx_event_t *ev) {
nchan_finalize_fake_request(d->r, NGX_OK);
}

//see https://github.com/slact/nchan/pull/591
typedef struct {
void *fsub;
ngx_pool_t *pool;
ngx_buf_t *msgbuf;
nchan_fakereq_subrequest_data_t *subrequest;
} ws_publish_data_stub_t;

nchan_fakereq_subrequest_data_t *nchan_requestmachine_request(nchan_requestmachine_t *rm, nchan_requestmachine_request_params_t *params) {
nchan_fakereq_subrequest_data_t *d;
ngx_pool_t *pool = params->pool;
Expand Down Expand Up @@ -468,7 +476,7 @@ nchan_fakereq_subrequest_data_t *nchan_requestmachine_request(nchan_requestmachi
fakebody_buf->last_buf = 1;
fakebody_buf->last_in_chain = 1;
fakebody_buf->flush = 1;
fakebody_buf->memory = 1;
//fakebody_buf->memory = 1; //why were file-cached requests (over 16KB) disabled?

nchan_adjust_subrequest(sr, NGX_HTTP_POST, &POST_REQUEST_STRING, sr_body, sz);
}
Expand All @@ -482,6 +490,11 @@ nchan_fakereq_subrequest_data_t *nchan_requestmachine_request(nchan_requestmachi

nchan_slist_append(&rm->request_queue, d);

//see https://github.com/slact/nchan/pull/591
ws_publish_data_stub_t *pd;
pd = (ws_publish_data_stub_t*)(params->pd);
if(pd) pd->subrequest = d;

nchan_requestmachine_run(rm);
return d;
}
Expand Down
5 changes: 5 additions & 0 deletions src/util/nchan_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ static ngx_int_t nchan_output_filter_generic(ngx_http_request_t *r, nchan_msg_t
if(r->out == NULL) {
if(ctx) {
flush_all_the_reserved_things(ctx);

//Fix crash when using X-Accel-Redirect with message forwarding
//See https://github.com/slact/nchan/pull/591
//There may be a better way to do this...
ngx_http_set_ctx(r->main, NULL, ngx_http_charset_filter_module);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/util/nchan_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ ngx_int_t nchan_msg_buf_open_fd_if_needed(ngx_buf_t *buf, ngx_file_t *file, ngx_
ngx_str_t *msgtag_to_str(nchan_msg_id_t *id);
ngx_str_t *msgid_to_str(nchan_msg_id_t *id);
size_t msgtag_to_strptr(nchan_msg_id_t *id, char *ch);

extern ngx_module_t ngx_http_charset_filter_module;
10 changes: 9 additions & 1 deletion src/util/nchan_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ ngx_buf_t *nchan_common_deflate(ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t
return out;
}

ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool) {
ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool, uint64_t max, int *result) {
ngx_str_t mm_instr = {0, NULL};
int mmapped = 0;
ngx_temp_file_t *tf = NULL;
Expand All @@ -975,6 +975,8 @@ ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r,
unsigned have = 0;
off_t written = 0;
int trailer_appended = 0;

*result = 0;

//input
if(ngx_buf_in_memory(in)) {
Expand Down Expand Up @@ -1018,6 +1020,7 @@ ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r,
}

have = ZLIB_CHUNK - stream->avail_out;
if((uint64_t)have + written > max) break;

if(stream->avail_out == 0 && tf == NULL) {
//if we filled up the buffer, let's start dumping to a file.
Expand All @@ -1033,6 +1036,11 @@ ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r,
munmap(mm_instr.data, mm_instr.len);
}

if((uint64_t)have + written > max) {
*result = -1;
deflateReset(deflate_zstream);
return NULL;
}
if((out = ngx_palloc(pool, sizeof(*out))) == NULL) {
nchan_log_request_error(r, "failed to allocate output buf for deflated message");
deflateReset(deflate_zstream);
Expand Down
2 changes: 1 addition & 1 deletion src/util/nchan_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ ngx_int_t nchan_common_deflate_init(nchan_main_conf_t *mcf);
ngx_buf_t *nchan_common_deflate(ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool);
ngx_int_t nchan_common_simple_deflate_raw_block(ngx_str_t *in, ngx_str_t *out);
ngx_int_t nchan_common_simple_deflate(ngx_str_t *in, ngx_str_t *out);
ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool);
ngx_buf_t *nchan_inflate(z_stream *stream, ngx_buf_t *in, ngx_http_request_t *r, ngx_pool_t *pool, uint64_t max, int *result);

uint64_t nchan_htonll(uint64_t value);
uint64_t nchan_ntohll(uint64_t value);
Expand Down