From b3cc1d2dfc2710092e4cf8bcccf38c91de251a20 Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 5 Mar 2024 11:29:32 +0000 Subject: [PATCH 1/3] Refac --- sources/frontend.c | 147 +++++++++------------------------------------ 1 file changed, 27 insertions(+), 120 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index 7bba5a609..170ba4abf 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -984,16 +984,15 @@ static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size, return msg; } -static od_frontend_status_t od_frontend_deploy_prepared_stmt( - od_server_t *server, od_relay_t *relay, char *ctx, - machine_msg_t *msg /* to adcance or to write? */ +static od_frontend_status_t +od_frontend_deploy_prepared_stmt(od_server_t *server, od_relay_t *relay, + char *ctx, char *data, + int size /* to adcance or to write? */ ) { od_route_t *route = server->route; od_instance_t *instance = server->global->instance; od_client_t *client = server->client; - char *data = machine_msg_data(msg); - int size = machine_msg_size(msg); od_hash_t body_hash = od_murmur_hash(data, size); @@ -1057,6 +1056,16 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt( } } +static inline od_frontend_status_t od_frontend_deploy_prepared_stmt_msg( + od_server_t *server, od_relay_t *relay, char *ctx, + machine_msg_t *msg /* to adcance or to write? */ +) +{ + return od_frontend_deploy_prepared_stmt(server, relay, ctx, + machine_msg_data(msg), + machine_msg_size(msg)); +} + static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, char *data, int size) { @@ -1138,65 +1147,12 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, return OD_ESERVER_WRITE; } - od_hash_t body_hash = - od_murmur_hash(desc->data, desc->len); - - od_debug(&instance->logger, "rewrite describe", client, - server, "statement: %.*s, hash: %08x", - desc->len, desc->data, body_hash); - - char opname[OD_HASH_LEN]; - od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); - - int refcnt = 0; - od_hashmap_elt_t value; - value.data = &refcnt; - value.len = sizeof(int); - od_hashmap_elt_t *value_ptr = &value; - - // send parse msg if needed - if (od_hashmap_insert(server->prep_stmts, body_hash, - desc, &value_ptr) == 0) { - od_debug( - &instance->logger, - "rewrite parse before describe", client, - server, - "deploy %.*s operator hash %u to server", - desc->len, desc->data, keyhash); - // rewrite msg - // allocate prepered statement under name equal to body hash - - machine_msg_t *pmsg; - pmsg = kiwi_fe_write_parse_description( - NULL, opname, OD_HASH_LEN, desc->data, - desc->len); - if (pmsg == NULL) { - return OD_ESERVER_WRITE; - } - - if (instance->config.log_query || - route->rule->log_query) { - od_frontend_log_parse( - instance, client, - "rewrite parse", - machine_msg_data(pmsg), - machine_msg_size(pmsg)); - } - - od_stat_parse(&route->stats); - // msg deallocated here - - machine_iov_add(relay->iov, pmsg); - od_dbg_printf_on_dvl_lvl( - 1, "client relay %p advance msg %c\n", - relay, *(char *)machine_msg_data(pmsg)); - - } else { - int *refcnt; - refcnt = value_ptr->data; - *refcnt = 1 + *refcnt; - - od_stat_parse_reuse(&route->stats); + /* fill internals structs in, send parse if needed */ + if (od_frontend_deploy_prepared_stmt( + server, &server->relay, "parse before bind", + data, size) != OD_OK) { + status = OD_ESERVER_WRITE; + break; } machine_msg_t *msg; @@ -1314,61 +1270,12 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, return OD_ESERVER_WRITE; } - od_hash_t body_hash = - od_murmur_hash(desc->data, desc->len); - - od_debug(&instance->logger, "rewrite bind", client, - server, "statement: %.*s, hash: %08x", - desc->len, desc->data, body_hash); - - od_hashmap_elt_t value; - int refcnt = 1; - value.data = &refcnt; - value.len = sizeof(int); - od_hashmap_elt_t *value_ptr = &value; - - char opname[OD_HASH_LEN]; - od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); - - if (od_hashmap_insert(server->prep_stmts, body_hash, - desc, &value_ptr) == 0) { - od_debug( - &instance->logger, - "rewrite parse before bind", client, - server, - "deploy %.*s operator hash %u to server", - desc->len, desc->data, keyhash); - // rewrite msg - // allocate prepered statement under name equal to body hash - machine_msg_t *pmsg; - pmsg = kiwi_fe_write_parse_description( - NULL, opname, OD_HASH_LEN, desc->data, - desc->len); - - if (pmsg == NULL) { - return OD_ESERVER_WRITE; - } - - if (instance->config.log_query || - route->rule->log_query) { - od_frontend_log_parse( - instance, client, - "rewrite parse", - machine_msg_data(pmsg), - machine_msg_size(pmsg)); - } - - od_stat_parse(&route->stats); - machine_iov_add(relay->iov, pmsg); - - od_dbg_printf_on_dvl_lvl( - 1, "client relay %p advance msg %c\n", - relay, *(char *)machine_msg_data(pmsg)); - - } else { - int *refcnt = value_ptr->data; - *refcnt = 1 + *refcnt; - od_stat_parse_reuse(&route->stats); + /* fill internals structs in, send parse if needed */ + if (od_frontend_deploy_prepared_stmt( + server, &server->relay, "parse before bind", + data, size) != OD_OK) { + status = OD_ESERVER_WRITE; + break; } int opname_start_offset = @@ -1784,7 +1691,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) assert(server->parse_msg != NULL); /* fill internals structs in */ - if (od_frontend_deploy_prepared_stmt( + if (od_frontend_deploy_prepared_stmt_msg( server, &server->relay, "sync-point", server->parse_msg) != OD_OK) { status = OD_ESERVER_WRITE; From 6ccfe3c36219623afe13c7f5925810d5d061bbcd Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 5 Mar 2024 11:46:34 +0000 Subject: [PATCH 2/3] fix --- sources/frontend.c | 62 ++++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index 170ba4abf..640476f04 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -959,27 +959,25 @@ static inline od_retcode_t od_frontend_log_bind(od_instance_t *instance, // 8 hex #define OD_HASH_LEN 9 -static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size, - int opname_start_offset, - int operator_name_len, - od_hash_t body_hash) +static inline machine_msg_t * +od_frontend_rewrite_msg(char *data, int size, int opname_start_offset, + int operator_name_len, char *opname, int opnamelen) { machine_msg_t *msg = - machine_msg_create(size - operator_name_len + OD_HASH_LEN); + machine_msg_create(size - operator_name_len + opnamelen); char *rewrite_data = machine_msg_data(msg); // packet header memcpy(rewrite_data, data, opname_start_offset); // prefix for opname - od_snprintf(rewrite_data + opname_start_offset, OD_HASH_LEN, "%08x", - body_hash); + od_snprintf(rewrite_data + opname_start_offset, opnamelen, opname); // rest of msg - memcpy(rewrite_data + opname_start_offset + OD_HASH_LEN, + memcpy(rewrite_data + opname_start_offset + opnamelen, data + opname_start_offset + operator_name_len, size - opname_start_offset - operator_name_len); // set proper size to package kiwi_header_set_size((kiwi_header_t *)rewrite_data, - size - operator_name_len + OD_HASH_LEN); + size - operator_name_len + opnamelen); return msg; } @@ -987,15 +985,13 @@ static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size, static od_frontend_status_t od_frontend_deploy_prepared_stmt(od_server_t *server, od_relay_t *relay, char *ctx, char *data, - int size /* to adcance or to write? */ -) + int size /* to adcance or to write? */, + od_hash_t body_hash, char opname[OD_HASH_LEN]) { od_route_t *route = server->route; od_instance_t *instance = server->global->instance; od_client_t *client = server->client; - od_hash_t body_hash = od_murmur_hash(data, size); - od_hashmap_elt_t desc; desc.data = data; desc.len = size; @@ -1003,9 +999,6 @@ od_frontend_deploy_prepared_stmt(od_server_t *server, od_relay_t *relay, od_debug(&instance->logger, ctx, client, server, "statement: %.*s, hash: %08x", desc.len, desc.data, body_hash); - char opname[OD_HASH_LEN]; - od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); - int refcnt = 0; od_hashmap_elt_t value; value.data = &refcnt; @@ -1061,9 +1054,14 @@ static inline od_frontend_status_t od_frontend_deploy_prepared_stmt_msg( machine_msg_t *msg /* to adcance or to write? */ ) { - return od_frontend_deploy_prepared_stmt(server, relay, ctx, - machine_msg_data(msg), - machine_msg_size(msg)); + char *data = machine_msg_data(msg); + int size = machine_msg_size(msg); + + od_hash_t body_hash = od_murmur_hash(data, size); + char opname[OD_HASH_LEN]; + od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); + return od_frontend_deploy_prepared_stmt(server, relay, ctx, data, size, + body_hash, opname); } static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, @@ -1147,12 +1145,17 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, return OD_ESERVER_WRITE; } + od_hash_t body_hash = + od_murmur_hash(desc->data, desc->len); + char opname[OD_HASH_LEN]; + od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); + /* fill internals structs in, send parse if needed */ if (od_frontend_deploy_prepared_stmt( server, &server->relay, "parse before bind", - data, size) != OD_OK) { - status = OD_ESERVER_WRITE; - break; + desc->data, desc->len, body_hash, + opname) != OD_OK) { + return OD_ESERVER_WRITE; } machine_msg_t *msg; @@ -1270,12 +1273,17 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, return OD_ESERVER_WRITE; } + od_hash_t body_hash = + od_murmur_hash(desc->data, desc->len); + char opname[OD_HASH_LEN]; + od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); + /* fill internals structs in, send parse if needed */ if (od_frontend_deploy_prepared_stmt( server, &server->relay, "parse before bind", - data, size) != OD_OK) { - status = OD_ESERVER_WRITE; - break; + desc->data, desc->len, opname, + body_hash) != OD_OK) { + return OD_ESERVER_WRITE; } int opname_start_offset = @@ -1287,8 +1295,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_t *msg; msg = od_frontend_rewrite_msg(data, size, opname_start_offset, - operator_name_len, - body_hash); + operator_name_len, opname, + OD_HASH_LEN); if (msg == NULL) { return OD_ESERVER_WRITE; From 9582db6dd938a71b8e8f12a4515e929002c060d7 Mon Sep 17 00:00:00 2001 From: reshke Date: Wed, 6 Mar 2024 19:11:16 +0000 Subject: [PATCH 3/3] FIX --- sources/frontend.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index 640476f04..39c8ba164 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -982,11 +982,10 @@ od_frontend_rewrite_msg(char *data, int size, int opname_start_offset, return msg; } -static od_frontend_status_t -od_frontend_deploy_prepared_stmt(od_server_t *server, od_relay_t *relay, - char *ctx, char *data, - int size /* to adcance or to write? */, - od_hash_t body_hash, char opname[OD_HASH_LEN]) +static od_frontend_status_t od_frontend_deploy_prepared_stmt( + od_server_t *server, od_relay_t *relay, char *ctx, char *data, + int size /* to adcance or to write? */, od_hash_t body_hash, + char *opname, int opnamelen) { od_route_t *route = server->route; od_instance_t *instance = server->global->instance; @@ -1009,16 +1008,16 @@ od_frontend_deploy_prepared_stmt(od_server_t *server, od_relay_t *relay, if (od_hashmap_insert(server->prep_stmts, body_hash, &desc, &value_ptr) == 0) { od_debug(&instance->logger, ctx, client, server, - "deploy %.*s operator %s to server", desc.len, - desc.data, opname); + "deploy %.*s operator %.*s to server", desc.len, + desc.data, opnamelen, opname); // rewrite msg // allocate prepered statement under name equal to body hash od_stat_parse(&route->stats); machine_msg_t *pmsg; - pmsg = kiwi_fe_write_parse_description( - NULL, opname, OD_HASH_LEN, desc.data, desc.len); + pmsg = kiwi_fe_write_parse_description(NULL, opname, opnamelen, + desc.data, desc.len); if (pmsg == NULL) { return OD_ESERVER_WRITE; } @@ -1061,7 +1060,7 @@ static inline od_frontend_status_t od_frontend_deploy_prepared_stmt_msg( char opname[OD_HASH_LEN]; od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); return od_frontend_deploy_prepared_stmt(server, relay, ctx, data, size, - body_hash, opname); + body_hash, opname, OD_HASH_LEN); } static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, @@ -1153,8 +1152,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, /* fill internals structs in, send parse if needed */ if (od_frontend_deploy_prepared_stmt( server, &server->relay, "parse before bind", - desc->data, desc->len, body_hash, - opname) != OD_OK) { + desc->data, desc->len, body_hash, opname, + OD_HASH_LEN) != OD_OK) { return OD_ESERVER_WRITE; } @@ -1281,8 +1280,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, /* fill internals structs in, send parse if needed */ if (od_frontend_deploy_prepared_stmt( server, &server->relay, "parse before bind", - desc->data, desc->len, opname, - body_hash) != OD_OK) { + desc->data, desc->len, body_hash, opname, + OD_HASH_LEN) != OD_OK) { return OD_ESERVER_WRITE; }