Skip to content

Commit

Permalink
udp downstream api - work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Alon Blayer-Gat authored and alonbg committed Aug 2, 2016
1 parent 6ed0a0e commit 7ca716c
Show file tree
Hide file tree
Showing 10 changed files with 1,051 additions and 29 deletions.
33 changes: 19 additions & 14 deletions src/ngx_stream_lua_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ static int ngx_stream_lua_socket_receiveuntil_iterator(lua_State *L);
static ngx_int_t ngx_stream_lua_socket_compile_pattern(u_char *data, size_t len,
ngx_stream_lua_socket_compiled_pattern_t *cp, ngx_log_t *log);
static int ngx_stream_lua_socket_cleanup_compiled_pattern(lua_State *L);
static int ngx_stream_lua_req_socket(lua_State *L);
static int ngx_stream_lua_tcp_req_socket(lua_State *L);
static void ngx_stream_lua_req_socket_rev_handler(ngx_stream_session_t *s,
ngx_stream_lua_ctx_t *ctx);
static int ngx_stream_lua_socket_tcp_getreusedtimes(lua_State *L);
Expand Down Expand Up @@ -193,7 +193,7 @@ enum {
static char ngx_stream_lua_req_socket_metatable_key;
#endif
static char ngx_stream_lua_raw_req_socket_metatable_key;
static char ngx_stream_lua_tcp_socket_metatable_key;
static char ngx_stream_lua_socket_tcp_metatable_key;
static char ngx_stream_lua_upstream_udata_metatable_key;
static char ngx_stream_lua_downstream_udata_metatable_key;
static char ngx_stream_lua_pool_udata_metatable_key;
Expand Down Expand Up @@ -276,7 +276,7 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
/* }}} */

/* {{{tcp object metatable */
lua_pushlightuserdata(L, &ngx_stream_lua_tcp_socket_metatable_key);
lua_pushlightuserdata(L, &ngx_stream_lua_socket_tcp_metatable_key);
lua_createtable(L, 0 /* narr */, 11 /* nrec */);

lua_pushcfunction(L, ngx_stream_lua_socket_tcp_connect);
Expand Down Expand Up @@ -364,14 +364,6 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
}


void
ngx_stream_lua_inject_req_socket_api(lua_State *L)
{
lua_pushcfunction(L, ngx_stream_lua_req_socket);
lua_setfield(L, -2, "socket");
}


static int
ngx_stream_lua_socket_tcp(lua_State *L)
{
Expand All @@ -397,7 +389,7 @@ ngx_stream_lua_socket_tcp(lua_State *L)
| NGX_STREAM_LUA_CONTEXT_TIMER);

lua_createtable(L, 3 /* narr */, 1 /* nrec */);
lua_pushlightuserdata(L, &ngx_stream_lua_tcp_socket_metatable_key);
lua_pushlightuserdata(L, &ngx_stream_lua_socket_tcp_metatable_key);
lua_rawget(L, LUA_REGISTRYINDEX);
lua_setmetatable(L, -2);

Expand Down Expand Up @@ -3919,14 +3911,22 @@ ngx_stream_lua_socket_cleanup_compiled_pattern(lua_State *L)
}


void
ngx_stream_lua_inject_tcp_req_socket_api(lua_State *L)
{
lua_pushcfunction(L, ngx_stream_lua_tcp_req_socket);
lua_setfield(L, -2, "socket");
}


static int
ngx_stream_lua_req_socket(lua_State *L)
ngx_stream_lua_tcp_req_socket(lua_State *L)
{
int n, raw;
ngx_stream_session_t *s;
ngx_peer_connection_t *pc;
ngx_stream_lua_srv_conf_t *lscf;
ngx_connection_t *c;
ngx_stream_session_t *s;
ngx_stream_lua_ctx_t *ctx;
ngx_stream_lua_co_ctx_t *coctx;
ngx_stream_lua_cleanup_t *cln;
Expand Down Expand Up @@ -3957,6 +3957,11 @@ ngx_stream_lua_req_socket(lua_State *L)

c = s->connection;

if (c->type != SOCK_STREAM) {
return luaL_error(L, "socket api does not match connection transport",
lua_gettop(L));
}

#if !defined(nginx_version) || nginx_version < 1003013
lua_pushnil(L);
lua_pushliteral(L, "nginx version too old");
Expand Down
2 changes: 1 addition & 1 deletion src/ngx_stream_lua_socket_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ typedef struct {


void ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L);
void ngx_stream_lua_inject_req_socket_api(lua_State *L);
void ngx_stream_lua_inject_tcp_req_socket_api(lua_State *L);
void ngx_stream_lua_cleanup_conn_pools(lua_State *L);


Expand Down
223 changes: 219 additions & 4 deletions src/ngx_stream_lua_socket_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ static int ngx_stream_lua_socket_udp_setpeername(lua_State *L);
static int ngx_stream_lua_socket_udp_send(lua_State *L);
static int ngx_stream_lua_socket_udp_receive(lua_State *L);
static int ngx_stream_lua_socket_udp_settimeout(lua_State *L);
static int ngx_stream_lua_udp_req_socket(lua_State *L);
static void ngx_stream_lua_socket_udp_finalize(ngx_stream_session_t *s,
ngx_stream_lua_socket_udp_upstream_t *u);
static int ngx_stream_lua_socket_udp_downstream_destroy(lua_State *L);
static int ngx_stream_lua_socket_udp_upstream_destroy(lua_State *L);
static int ngx_stream_lua_socket_resolve_retval_handler(ngx_stream_session_t *s,
ngx_stream_lua_socket_udp_upstream_t *u, lua_State *L);
Expand Down Expand Up @@ -69,8 +71,13 @@ enum {
};


#if 0
static char ngx_stream_lua_req_socket_metatable_key;
#endif
static char ngx_stream_lua_raw_req_socket_metatable_key;
static char ngx_stream_lua_socket_udp_metatable_key;
static char ngx_stream_lua_udp_udata_metatable_key;
static char ngx_stream_lua_upstream_udata_metatable_key;
static char ngx_stream_lua_downstream_udata_metatable_key;
static u_char ngx_stream_lua_socket_udp_buffer[UDP_MAX_DATAGRAM_SIZE];


Expand Down Expand Up @@ -106,18 +113,206 @@ ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L)
lua_rawset(L, LUA_REGISTRYINDEX);
/* }}} */

/* udp socket object metatable */
lua_pushlightuserdata(L, &ngx_stream_lua_udp_udata_metatable_key);
/* {{{upstream userdata metatable */
lua_pushlightuserdata(L, &ngx_stream_lua_upstream_udata_metatable_key);
lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */
lua_pushcfunction(L, ngx_stream_lua_socket_udp_upstream_destroy);
lua_setfield(L, -2, "__gc");
lua_rawset(L, LUA_REGISTRYINDEX);
/* }}} */

/* {{{downstream userdata metatable */
lua_pushlightuserdata(L, &ngx_stream_lua_downstream_udata_metatable_key);
lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */
lua_pushcfunction(L, ngx_stream_lua_socket_udp_downstream_destroy);
lua_setfield(L, -2, "__gc");
lua_rawset(L, LUA_REGISTRYINDEX);
/* }}} */

#if 0
/* {{{udp req socket object metatable */
lua_pushlightuserdata(L, &ngx_stream_lua_req_socket_metatable_key);
lua_createtable(L, 0 /* narr */, 3 /* nrec */);

lua_pushcfunction(L, ngx_stream_lua_socket_udp_receive);
lua_setfield(L, -2, "receive");

lua_pushcfunction(L, ngx_stream_lua_socket_udp_settimeout);
lua_setfield(L, -2, "settimeout"); /* ngx socket mt */

lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");

lua_rawset(L, LUA_REGISTRYINDEX);
/* }}} */
#endif

/* {{{raw udp req socket object metatable */
lua_pushlightuserdata(L, &ngx_stream_lua_raw_req_socket_metatable_key);
lua_createtable(L, 0 /* narr */, 4 /* nrec */);

lua_pushcfunction(L, ngx_stream_lua_socket_udp_receive);
lua_setfield(L, -2, "receive");

lua_pushcfunction(L, ngx_stream_lua_socket_udp_send);
lua_setfield(L, -2, "send");

lua_pushcfunction(L, ngx_stream_lua_socket_udp_settimeout);
lua_setfield(L, -2, "settimeout"); /* ngx socket mt */

lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");
lua_rawset(L, LUA_REGISTRYINDEX);
/* }}} */

lua_pop(L, 1);
}


void
ngx_stream_lua_inject_udp_req_socket_api(lua_State *L)
{
lua_pushcfunction(L, ngx_stream_lua_udp_req_socket);
lua_setfield(L, -2, "udp_socket");
}


static int
ngx_stream_lua_udp_req_socket(lua_State *L)
{
int n, raw;
ngx_stream_session_t *s;
ngx_stream_lua_udp_connection_t *pc;
ngx_stream_lua_srv_conf_t *lscf;
ngx_connection_t *c;
ngx_stream_lua_ctx_t *ctx;
ngx_stream_lua_co_ctx_t *coctx;
ngx_stream_lua_cleanup_t *cln;

ngx_stream_lua_socket_udp_upstream_t *u;

n = lua_gettop(L);
if (n == 0) {
raw = 0;

} else if (n == 1) {
raw = lua_toboolean(L, 1);
lua_pop(L, 1);

} else {
return luaL_error(L, "expecting zero arguments, but got %d",
lua_gettop(L));
}

s = ngx_stream_lua_get_session(L);

ctx = ngx_stream_get_module_ctx(s, ngx_stream_lua_module);
if (ctx == NULL) {
return luaL_error(L, "no ctx found");
}

ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT);

c = s->connection;

if (c->type != SOCK_DGRAM) {
return luaL_error(L, "socket api does not match connection transport",
lua_gettop(L));
}

#if !defined(nginx_version) || nginx_version < 1003013
lua_pushnil(L);
lua_pushliteral(L, "nginx version too old");
return 2;
#else
if (ctx->downstream_busy_bufs) {
lua_pushnil(L);
lua_pushliteral(L, "pending data to write");
return 2;
}

dd("ctx acquired raw req socket: %d", ctx->acquired_raw_req_socket);

if (ctx->acquired_raw_req_socket) {
lua_pushnil(L);
lua_pushliteral(L, "duplicate call");
return 2;
}

ctx->acquired_raw_req_socket = 1;
#endif

lua_createtable(L, 3 /* narr */, 1 /* nrec */); /* the object */

lua_pushlightuserdata(L, &ngx_stream_lua_raw_req_socket_metatable_key);

lua_rawget(L, LUA_REGISTRYINDEX);
lua_setmetatable(L, -2);

u = lua_newuserdata(L, sizeof(ngx_stream_lua_socket_udp_upstream_t));
if (u == NULL) {
return luaL_error(L, "no memory");
}

#if 1
lua_pushlightuserdata(L, &ngx_stream_lua_downstream_udata_metatable_key);
lua_rawget(L, LUA_REGISTRYINDEX);
lua_setmetatable(L, -2);
#endif

lua_rawseti(L, 1, SOCKET_CTX_INDEX);

ngx_memzero(u, sizeof(ngx_stream_lua_socket_udp_upstream_t));

u->raw_downstream = 1;

coctx = ctx->cur_co_ctx;

u->session = s;

lscf = ngx_stream_get_module_srv_conf(s, ngx_stream_lua_module);

u->conf = lscf;

u->read_timeout = u->conf->read_timeout;

cln = ngx_stream_lua_cleanup_add(s, 0);
if (cln == NULL) {
u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
lua_pushnil(L);
lua_pushliteral(L, "no memory");
return 2;
}

cln->handler = ngx_stream_lua_socket_udp_cleanup;
cln->data = u;

u->cleanup = &cln->handler;

pc = &u->udp_connection;
pc->log = *c->log;
pc->connection = c;

dd("setting data to %p", u);

coctx->data = u;
ctx->downstream = u;

if (c->read->timer_set) {
ngx_del_timer(c->read);
}

if (raw) {
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
}

lua_settop(L, 1);
return 1;
}


static int
ngx_stream_lua_socket_udp(lua_State *L)
{
Expand Down Expand Up @@ -259,7 +454,7 @@ ngx_stream_lua_socket_udp_setpeername(lua_State *L)
}

#if 1
lua_pushlightuserdata(L, &ngx_stream_lua_udp_udata_metatable_key);
lua_pushlightuserdata(L, &ngx_stream_lua_upstream_udata_metatable_key);
lua_rawget(L, LUA_REGISTRYINDEX);
lua_setmetatable(L, -2);
#endif
Expand Down Expand Up @@ -1107,6 +1302,26 @@ ngx_stream_lua_socket_udp_upstream_destroy(lua_State *L)
}


static int
ngx_stream_lua_socket_udp_downstream_destroy(lua_State *L)
{
ngx_stream_lua_socket_udp_upstream_t *u;

dd("downstream destory");

u = lua_touserdata(L, 1);
if (u == NULL) {
dd("u is NULL");
return 0;
}

if (u->cleanup) {
ngx_stream_lua_socket_udp_cleanup(u); /* it will clear u->cleanup */
}

return 0;
}

static void
ngx_stream_lua_socket_dummy_handler(ngx_stream_session_t *s,
ngx_stream_lua_socket_udp_upstream_t *u)
Expand Down
13 changes: 7 additions & 6 deletions src/ngx_stream_lua_socket_udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,19 @@ struct ngx_stream_lua_socket_udp_upstream_s {

ngx_stream_lua_resolved_t *resolved;

ngx_uint_t ft_type;
ngx_err_t socket_errno;
size_t received; /* for receive */
size_t recv_buf_size;
ngx_uint_t ft_type;
ngx_err_t socket_errno;
size_t received; /* for receive */
size_t recv_buf_size;

ngx_stream_lua_co_ctx_t *co_ctx;

unsigned waiting; /* :1 */
unsigned raw_downstream:1;
unsigned waiting; /* :1 */
};


void ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L);
void ngx_stream_lua_inject_udp_req_socket_api(lua_State *L);


#endif /* _NGX_STREAM_LUA_SOCKET_UDP_H_INCLUDED_ */
Loading

0 comments on commit 7ca716c

Please sign in to comment.