Skip to content

Commit

Permalink
Fix memory leaks with system server shutdown, fix prepared statemnet…
Browse files Browse the repository at this point in the history
… memeory leak (#458)

Fix memory leaks with system server shutdown. 
Fix prepared statemnet memeory leak. 
Skip volatile param caching.
  • Loading branch information
reshke authored Aug 23, 2022
1 parent 4041401 commit f0ee140
Show file tree
Hide file tree
Showing 16 changed files with 174 additions and 50 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*.swp
.conf.example
.logrotate
.vscode/
sources/odyssey
sources/build.h
stress/odyssey_stress
Expand Down
11 changes: 10 additions & 1 deletion sources/backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ static inline int od_backend_terminate(od_server_t *server)

void od_backend_close_connection(od_server_t *server)
{
/* failed to connect to endpoint, so notring to do */
if (server->io.io == NULL) {
return;
}
if (machine_connected(server->io.io))
od_backend_terminate(server);

Expand Down Expand Up @@ -415,14 +419,15 @@ static inline int od_storage_parse_rw_check_response(machine_msg_t *msg)
goto error;
}

/* we expect exactly one row */
if (resp_len != 1) {
return NOT_OK_RESPONSE;
}
/* pg is in recovery false means db is open for write */
if (pos[0] == 'f') {
return OK_RESPONSE;
}
return NOT_OK_RESPONSE;
/* fallthrough to error */
error:
return NOT_OK_RESPONSE;
}
Expand All @@ -440,6 +445,7 @@ static inline od_retcode_t od_backend_attemp_connect_with_tsa(

rc = od_backend_connect_to(server, context, host, port, opts);
if (rc == NOT_OK_RESPONSE) {
od_backend_close_connection(server);
return rc;
}

Expand All @@ -456,6 +462,7 @@ static inline od_retcode_t od_backend_attemp_connect_with_tsa(
od_backend_close_connection(server);
return NOT_OK_RESPONSE;
}

switch (attrs) {
case OD_TARGET_SESSION_ATTRS_RW:
rc = od_storage_parse_rw_check_response(msg);
Expand Down Expand Up @@ -504,6 +511,7 @@ int od_backend_connect(od_server_t *server, char *context,
OD_TARGET_SESSION_ATTRS_RW,
client) == NOT_OK_RESPONSE) {
/*backend connection not macthed by TSA */
assert(server->io.io == NULL);
continue;
}

Expand Down Expand Up @@ -531,6 +539,7 @@ int od_backend_connect(od_server_t *server, char *context,
OD_TARGET_SESSION_ATTRS_RO,
client) == NOT_OK_RESPONSE) {
/*backend connection not macthed by TSA */
assert(server->io.io == NULL);
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion sources/cron.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static inline void od_cron_stat(od_cron_t *cron)
startup_errors);

/* request stats per worker */
int i;
uint32_t i;
for (i = 0; i < worker_pool->count; i++) {
od_worker_t *worker = &worker_pool->pool[i];
machine_msg_t *msg;
Expand Down
3 changes: 3 additions & 0 deletions sources/extention.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ static inline od_retcode_t od_extention_free(od_logger_t *l,
od_modules_unload(l, extentions->modules);
}

free(extentions->modules);
extentions->modules = NULL;

return OK_RESPONSE;
}

Expand Down
12 changes: 9 additions & 3 deletions sources/frontend.c
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
size);

if (route->rule->pool->reserve_prepared_statement) {
// skip client parse msg
/* skip client parse msg */
retstatus = OD_SKIP;
kiwi_prepared_statement_t desc;
int rc;
Expand Down Expand Up @@ -1191,14 +1191,19 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
value_ptr->len = desc.description_len;
value_ptr->data = desc.description;

// redeploy
// prev client allocated prepared stmt with same name
/* redeploy
* previous
* client allocated prepared stmt with same name
*/
char buf[OD_HASH_LEN];
od_snprintf(buf, OD_HASH_LEN, "%08x",
body_hash);

msg = kiwi_fe_write_close(
NULL, 'S', buf, OD_HASH_LEN);
if (msg == NULL) {
return OD_ESERVER_WRITE;
}
rc = od_write(&server->io, msg);
if (rc == -1) {
od_error(&instance->logger,
Expand Down Expand Up @@ -1289,6 +1294,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
client, server,
"stmt already exists, simply report its ok");
}
machine_msg_free(msg);
}

machine_msg_t *pmsg;
Expand Down
18 changes: 13 additions & 5 deletions sources/grac_shutdown_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@ static inline int od_system_server_complete_stop(od_system_server_t *server)

void od_grac_shutdown_worker(void *arg)
{
od_system_t *system = arg;
od_instance_t *instance = system->global->instance;
od_worker_pool_t *worker_pool;
od_system_t *system;
od_instance_t *instance;
od_router_t *router;

system = arg;
worker_pool = system->global->worker_pool;
instance = system->global->instance;
router = system->global->router;

od_log(&instance->logger, "config", NULL, NULL,
"stop to accepting new connections");

Expand All @@ -33,8 +41,6 @@ void od_grac_shutdown_worker(void *arg)
"working with old transactions",
OD_VERSION_NUMBER);

od_router_t *router = system->global->router;

od_list_t *i;
od_list_foreach(&router->servers, i)
{
Expand Down Expand Up @@ -67,6 +73,8 @@ void od_grac_shutdown_worker(void *arg)
server->sid.id);
}

od_worker_pool_wait_gracefully_shutdown(worker_pool);

od_dbg_printf_on_dvl_lvl(1, "shutting down sockets %s\n", "");

/* close sockets */
Expand All @@ -81,6 +89,6 @@ void od_grac_shutdown_worker(void *arg)
od_dbg_printf_on_dvl_lvl(
1, "waiting done, sending sigint to own process %d\n",
instance->pid.pid);

/* start de-initialize process */
kill(instance->pid.pid, SIGTERM);
}
2 changes: 1 addition & 1 deletion sources/instance.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ void od_instance_free(od_instance_t *instance)
// as mallocd on start
free(instance->config_file);
free(instance->exec_path);
od_log(&instance->logger, "shutdown", NULL, NULL, "Stopping Odyssey");
od_logger_close(&instance->logger);
machinarium_free();
}
Expand Down Expand Up @@ -71,6 +70,7 @@ static inline od_retcode_t od_args_init(od_arguments_t *args,
int od_instance_main(od_instance_t *instance, int argc, char **argv)
{
od_arguments_t args;
memset(&args, 0, sizeof(args));
struct argp argp;
od_bind_args(&argp);
od_bind_version();
Expand Down
14 changes: 13 additions & 1 deletion sources/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,20 @@ static inline int od_io_prepare(od_io_t *io, machine_io_t *io_obj,
io->io = io_obj;
int rc;
rc = od_readahead_prepare(&io->readahead, readahead);
if (rc == -1)
if (rc == -1) {
return -1;
}

/* in case we are reusing this io handle, free prev allocated
* cond vars
*/
if (io->on_read) {
machine_cond_free(io->on_read);
}
if (io->on_write) {
machine_cond_free(io->on_write);
}

io->on_read = machine_cond_create();
if (io->on_read == NULL)
return -1;
Expand Down
4 changes: 2 additions & 2 deletions sources/readahead.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ static inline void od_readahead_free(od_readahead_t *readahead)

static inline int od_readahead_prepare(od_readahead_t *readahead, int size)
{
readahead->size = size;
readahead->buf = machine_msg_create(size);
readahead->buf = machine_msg_create_or_advance(readahead->buf, size);
if (readahead->buf == NULL)
return -1;
readahead->size = size;
return 0;
}

Expand Down
33 changes: 33 additions & 0 deletions sources/router.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,41 @@ void od_router_init(od_router_t *router, od_global_t *global)
router->router_err_logger = od_err_logger_create_default();
}

static inline int od_router_immed_close_server_cb(od_server_t *server,
void **argv)
{
od_route_t *route = server->route;
/* remove server for server pool */
od_pg_server_pool_set(&route->server_pool, server, OD_SERVER_UNDEF);

server->route = NULL;
od_backend_close_connection(server);
od_backend_close(server);

return 0;
}

static inline int od_router_immed_close_cb(od_route_t *route, void **argv)
{
od_route_lock(route);
od_server_pool_foreach(&route->server_pool, OD_SERVER_IDLE,
od_router_immed_close_server_cb, argv);
od_route_unlock(route);
return 0;
}

void od_router_free(od_router_t *router)
{
od_list_t *i;
od_list_t *n;
od_list_foreach_safe(&router->servers, i, n)
{
od_system_server_t *server;
server = od_container_of(i, od_system_server_t, link);
od_system_server_free(server);
}

od_router_foreach(router, od_router_immed_close_cb, NULL);
od_route_pool_free(&router->route_pool);
od_rules_free(&router->rules);
pthread_mutex_destroy(&router->lock);
Expand Down
11 changes: 9 additions & 2 deletions sources/sighandler.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ od_system_gracefully_killer_invoke(od_system_t *system)
static inline void od_system_cleanup(od_system_t *system)
{
od_instance_t *instance = system->global->instance;

od_list_t *i;

od_list_foreach(&instance->config.listen, i)
{
od_config_listen_t *listen;
Expand All @@ -44,20 +44,27 @@ static inline void od_system_cleanup(od_system_t *system)
od_attribute_noreturn() void od_system_shutdown(od_system_t *system,
od_instance_t *instance)
{
od_worker_pool_t *worker_pool;

worker_pool = system->global->worker_pool;
od_log(&instance->logger, "system", NULL, NULL,
"SIGINT received, shutting down");

// lock here
od_cron_stop(system->global->cron);

od_worker_pool_stop(system->global->worker_pool);
od_worker_pool_stop(worker_pool);

od_router_free(system->global->router);
/* Prevent OpenSSL usage during deinitialization */
od_worker_pool_wait();

od_extention_free(&instance->logger, system->global->extentions);

od_system_cleanup(system);

/* stop machinaruim and free */
od_instance_free(instance);
exit(0);
}

Expand Down
Loading

0 comments on commit f0ee140

Please sign in to comment.